From 36acd1320c8f935914934be8289c71a1202fd89b Mon Sep 17 00:00:00 2001 From: rafapolo Date: Sun, 29 Mar 2026 17:39:13 +0200 Subject: [PATCH] feat: add --sync to export BQ tables directly to S3 without GCS intermediary --- roda.sh | 63 +++++ sync_bq_to_local.py | 543 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 606 insertions(+) create mode 100755 sync_bq_to_local.py diff --git a/roda.sh b/roda.sh index 89f9df4..9612e78 100755 --- a/roda.sh +++ b/roda.sh @@ -31,10 +31,13 @@ fi DRY_RUN=false GCLOUD_RUN=false +SYNC_RUN=false if [[ "${1:-}" == "--dry-run" ]]; then DRY_RUN=true elif [[ "${1:-}" == "--gcloud-run" ]]; then GCLOUD_RUN=true +elif [[ "${1:-}" == "--sync" ]]; then + SYNC_RUN=true fi # ----------------------------------------------------------------------------- @@ -74,6 +77,18 @@ if [[ -z "${AWS_ACCESS_KEY_ID:-}" || -z "${AWS_SECRET_ACCESS_KEY:-}" ]]; then exit 1 fi +# Validate GCP project (needed for --sync) +if [[ -z "${GCP_PROJECT:-}" ]]; then + if $SYNC_RUN; then + if [[ -z "${YOUR_PROJECT:-}" ]]; then + log_err "GCP_PROJECT não encontrado no .env. Adicione GCP_PROJECT ou YOUR_PROJECT." + exit 1 + fi + log "GCP_PROJECT not set, using YOUR_PROJECT: $YOUR_PROJECT" + export GCP_PROJECT="$YOUR_PROJECT" + fi +fi + # Configure rclone remotes via env vars — no rclone.conf or inline credentials needed. # GCS remote (bd:) uses Application Default Credentials from gcloud auth application-default login. # Hetzner S3 remote (hz:) uses the credentials from .env, kept out of the process command line. @@ -182,6 +197,54 @@ REMOTE_SETUP exit 0 fi +# ============================================================================= +# SYNC — BigQuery → S3 direct (no GCS intermediary) +# ============================================================================= +if $SYNC_RUN; then + log "==============================" + log " SYNC MODE — BigQuery → S3" + log "==============================" + + # Check dependencies + for cmd in python3; do + if ! command -v "$cmd" &>/dev/null; then + log_err "'$cmd' not found." + exit 1 + fi + done + + # Check Python dependencies (import name vs pip package name differs) + PYTHON_CHECKS="google.cloud.bigquery:boto3:pandas:pyarrow" + for check in $(echo "$PYTHON_CHECKS" | tr ':' '\n'); do + module="${check}" + if ! python3 -c "import ${module}" 2>/dev/null; then + pip_pkg="${module}" + log_err "Missing Python package: ${pip_pkg}. Run: pip install google-cloud-bigquery boto3 pandas pyarrow" + exit 1 + fi + done + + # Set GCP_PROJECT for the Python script + export GCP_PROJECT="${GCP_PROJECT:-${YOUR_PROJECT}}" + + log "GCP project: $GCP_PROJECT" + log "S3 bucket: $HETZNER_S3_BUCKET" + log "S3 endpoint: $HETZNER_S3_ENDPOINT" + log "" + + if $DRY_RUN; then + log "DRY RUN — listing tables only, no data will be transferred" + fi + + # Run the sync script, filtering out --sync (roda.sh flag) + SYNC_ARGS=() + for arg in "$@"; do + [[ "$arg" != "--sync" ]] && SYNC_ARGS+=("$arg") + done + python3 sync_bq_to_local.py "${SYNC_ARGS[@]+"${SYNC_ARGS[@]}"}" + exit $? +fi + # ----------------------------------------------------------------------------- # STEP 1 — Create GCS bucket in US region (same as basedosdados) # ----------------------------------------------------------------------------- diff --git a/sync_bq_to_local.py b/sync_bq_to_local.py new file mode 100755 index 0000000..eefad0c --- /dev/null +++ b/sync_bq_to_local.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 +""" +sync_bq_to_local.py + +Syncs missing tables from BigQuery (basedosdados project) to Hetzner S3, +then registers them as DuckDB views. + +Usage: + python3 sync_bq_to_local.py # full sync + python3 sync_bq_to_local.py --dry-run # list missing tables only + python3 sync_bq_to_local.py --resume # resume from last run + +Prerequisites: + gcloud auth application-default login + GCP project with billing enabled (free tier: 1 TB/month) + +Environment (.env): + GCP_PROJECT - GCP project ID for billing + HETZNER_S3_BUCKET - S3 bucket name + HETZNER_S3_ENDPOINT - S3 endpoint URL + AWS_ACCESS_KEY_ID - S3 access key + AWS_SECRET_ACCESS_KEY - S3 secret key +""" + +import os +import sys +import json +import argparse +import logging +import subprocess +from datetime import datetime +from pathlib import Path +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed + +import boto3 +from botocore.config import Config as BotoConfig +from google.cloud import bigquery + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- +LOG_FILE = f"sync_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + handlers=[ + logging.FileHandler(LOG_FILE), + logging.StreamHandler(sys.stdout), + ], +) +log = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +SOURCE_PROJECT = "basedosdados" +MISSING_TABLES_FILE = "tasks/datasets_to_scrap.md" +DONE_FILE = "done_sync.txt" +FAILED_FILE = "failed_sync.txt" +DATA_DIR = "data" +PARQUET_DIR = "parquet" +MAX_RETRIES = 3 +BATCH_SIZE = 1 # export one table at a time to manage memory +WORKERS = 4 # parallel uploads + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def load_env(): + """Load required environment variables.""" + from dotenv import load_dotenv + load_dotenv() + + required = [ + "GCP_PROJECT", + "HETZNER_S3_BUCKET", + "HETZNER_S3_ENDPOINT", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + ] + missing = [v for v in required if not os.environ.get(v)] + if missing: + log.error("Missing env vars: %s", missing) + sys.exit(1) + + return {v: os.environ[v] for v in required} + + +def get_s3_client(env): + """Create boto3 S3 client configured for Hetzner.""" + return boto3.client( + "s3", + endpoint_url=env["HETZNER_S3_ENDPOINT"], + aws_access_key_id=env["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=env["AWS_SECRET_ACCESS_KEY"], + config=BotoConfig(s3={"addressing_style": "path"}), + ) + + +def get_bq_client(): + """Create BigQuery client using Application Default Credentials.""" + try: + os.environ["GOOGLE_CLOUD_PROJECT"] = os.environ.get("GCP_PROJECT", "") + os.environ["GCLOUD_PROJECT"] = os.environ.get("GCP_PROJECT", "") + client = bigquery.Client(project=os.environ.get("GCP_PROJECT", "")) + # Test the connection + list(client.list_datasets(max_results=1)) + return client + except Exception as e: + log.error("BigQuery auth failed: %s", e) + log.error("") + log.error("Run these commands to authenticate:") + log.error(" gcloud auth login") + log.error(" gcloud auth application-default login") + log.error(" gcloud config set project %s", os.environ.get("GCP_PROJECT", "")) + log.error("") + log.error("The free tier (1 TB/month) is sufficient — no credit card needed.") + sys.exit(1) + + +def list_bq_tables(bq_client): + """List all tables in the basedosdados BigQuery project.""" + log.info("Discovering tables in BigQuery project: %s", SOURCE_PROJECT) + tables = {} + + try: + datasets = list(bq_client.list_datasets()) + log.info("Found %d datasets", len(datasets)) + except Exception as e: + log.error("Failed to list datasets: %s", e) + sys.exit(1) + + for dataset in datasets: + try: + tables_list = list( + bq_client.list_tables( + f"{SOURCE_PROJECT}.{dataset.dataset_id}", + max_results=10000, + ) + ) + for t in tables_list: + tables[f"{dataset.dataset_id}.{t.table_id}"] = { + "dataset": dataset.dataset_id, + "table": t.table_id, + "full_id": f"{SOURCE_PROJECT}.{dataset.dataset_id}.{t.table_id}", + "schema": [f.name for f in t.schema] if t.schema else [], + "num_bytes": t.num_bytes, + "num_rows": t.num_rows, + } + except Exception as e: + log.warning("Failed to list tables in dataset %s: %s", dataset.dataset_id, e) + + log.info("Total BigQuery tables discovered: %d", len(tables)) + return tables + + +def list_s3_tables(s3_client, bucket): + """List datasets/tables already exported to S3.""" + log.info("Discovering tables already in S3 bucket: %s", bucket) + table_files = defaultdict(lambda: defaultdict(list)) + + try: + paginator = s3_client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket): + for obj in page.get("Contents", []): + key = obj["Key"] + if not key.endswith(".parquet"): + continue + parts = key.split("/") + if len(parts) >= 3: + dataset, table = parts[0], parts[1] + table_files[dataset][table].append(key) + except Exception as e: + log.warning("S3 listing error (may be empty bucket): %s", e) + + tables = {} + for dataset, t_dict in table_files.items(): + for table, files in t_dict.items(): + tables[f"{dataset}.{table}"] = files + + log.info("Total S3 tables discovered: %d", len(tables)) + return tables + + +def parse_missing_tables_from_md(filepath): + """Parse the missing tables from tasks/datasets_to_scrap.md. + + Returns a dict mapping 'dataset.table' -> description. + Falls back to None (use all non-S3 tables) if file not found. + """ + if not os.path.exists(filepath): + log.warning("Missing file %s, using all non-S3 tables", filepath) + return None + + log.info("Parsing missing tables from %s", filepath) + with open(filepath) as f: + content = f.read() + + missing = {} + lines = content.split("\n") + i = 0 + + def next_nonempty(lines, i): + while i < len(lines) and not lines[i].strip(): + i += 1 + return i + + while i < len(lines): + line = lines[i].strip() + + # Find the Basedosdados.org section + if "Basedosdados.org" in line and "Not in basedosdados.duckdb" in line: + log.info("Found Basedosdados.org section at line %d", i + 1) + i += 1 + break + i += 1 + + # Now parse table entries + while i < len(lines): + line = lines[i].strip() + + # End of section only on top-level ## headers, not ### subsections + if line.startswith("## "): + break + + # Skip separators and empty lines + if not line or line.startswith("---") or "|---" in line: + i += 1 + continue + + # Find rows with backtick-wrapped dataset names (e.g. | `br_abrinq_oca` | ...) + if "`" in line and "|" in line: + # Split by pipe, strip whitespace and backticks + parts = [p.strip().strip("`").strip() for p in line.split("|")] + # Filter empty parts + parts = [p for p in parts if p] + + if len(parts) >= 2: + dataset_raw = parts[0] + # Check if it looks like a dataset name (br_*, eu_*, mundo_*, etc.) + is_dataset = any( + dataset_raw.startswith(prefix) + for prefix in ("br_", "eu_", "mundo_", "nl_", "world_") + ) + + if is_dataset: + # parts[1] contains the missing table names (comma-separated) + tables_raw = parts[1] + for tbl in tables_raw.split(","): + tbl = tbl.strip() + # Clean up: remove parenthetical notes, trailing text + if "(" in tbl: + tbl = tbl.split("(")[0].strip() + if tbl and not tbl.startswith("-"): + missing[f"{dataset_raw}.{tbl}"] = f"from {filepath}" + + i += 1 + + log.info("Parsed %d missing table references from MD", len(missing)) + return missing if missing else None + + +def compute_missing_tables(bq_tables, s3_tables, md_missing): + """Compute which tables need to be synced.""" + if md_missing is None: + log.info("No MD file, computing diff: BQ - S3") + return [ + (table_id, info) + for table_id, info in bq_tables.items() + if table_id not in s3_tables + ] + + log.info("Computing sync targets: MD missing tables not in S3") + targets = [] + for key, info in bq_tables.items(): + if key in s3_tables: + continue + if key in md_missing: + targets.append((key, info)) + else: + # Table not in S3 but not in MD missing list + # Check if its dataset is partially covered + dataset = info["dataset"] + table = info["table"] + # If any table from this dataset is in MD missing, include it + dataset_in_md = any( + k.startswith(f"{dataset}.") and k.split(".", 1)[1] in md_missing + for k in bq_tables + ) + if not dataset_in_md: + targets.append((key, info)) + + return targets + + +def estimate_size_mb(num_bytes): + """Estimate size in MB.""" + if num_bytes is None: + return "?" + return f"{num_bytes / 1_048_576:.1f}" + + +# --------------------------------------------------------------------------- +# Export logic +# --------------------------------------------------------------------------- + +def sync_table(args, table_id, info, dry_run=False): + """Sync a single table: BQ → parquet → S3 → DuckDB view.""" + bq_client, s3_client, bucket = args + dataset = info["dataset"] + table = info["table"] + full_id = info["full_id"] + + s3_key_prefix = f"{dataset}/{table}" + + if dry_run: + size_mb = estimate_size_mb(info.get("num_bytes")) + return True, f"[DRY] {dataset}.{table} (~{size_mb} MB)" + + # Step 1: Query from BigQuery + log.info("Querying %s from BigQuery", full_id) + query = f"SELECT * FROM `{full_id}`" + + try: + query_job = bq_client.query(query, location="US") + df = query_job.to_dataframe() + except Exception as e: + return False, f"BQ query failed for {table_id}: {e}" + + if df.empty: + return True, f"[SKIP] {table_id} — empty table" + + if df.shape[0] > 10_000_000: + log.warning("Table %s has %d rows — may be slow/memory-intensive", table_id, df.shape[0]) + + # Step 2: Write to parquet in memory, then upload + import io + import pyarrow as pa + import pyarrow.parquet as pq + + buffer = io.BytesIO() + table_pa = pa.Table.from_pandas(df) + + # Write with zstd compression + writer = pq.ParquetWriter( + buffer, + table_pa.schema, + compression="zstd", + use_dictionary=True, + ) + writer.write_table(table_pa) + writer.close() + buffer.seek(0) + + s3_key = f"{s3_key_prefix}/{table}.parquet" + log.info("Uploading %s → s3://%s/%s (%s, %d rows)", + table_id, bucket, s3_key, + f"{buffer.getbuffer().nbytes / 1_048_576:.1f} MB", + df.shape[0]) + + try: + s3_client.upload_fileobj( + buffer, + bucket, + s3_key, + ExtraArgs={"ContentType": "application/octet-stream"}, + ) + except Exception as e: + return False, f"S3 upload failed for {table_id}: {e}" + + log.info("[DONE] %s uploaded to s3://%s/%s", table_id, bucket, s3_key) + return True, f"[DONE] {table_id}" + + +def update_duckdb_view(env, table_id, info): + """Register a new table as a DuckDB view over S3 parquet.""" + import duckdb + + dataset = info["dataset"] + table = info["table"] + bucket = env["HETZNER_S3_BUCKET"] + endpoint = env["HETZNER_S3_ENDPOINT"].removeprefix("https://").removeprefix("http://") + access_key = env["AWS_ACCESS_KEY_ID"] + secret_key = env["AWS_SECRET_ACCESS_KEY"] + + # S3 path + s3_path = f"s3://{bucket}/{dataset}/{table}/{table}.parquet" + + try: + con = duckdb.connect("basedosdados.duckdb", read_only=False) + con.execute("INSTALL httpfs; LOAD httpfs;") + con.execute(f"SET s3_endpoint='{endpoint}';") + con.execute(f"SET s3_access_key_id='{access_key}';") + con.execute(f"SET s3_secret_access_key='{secret_key}';") + con.execute(f"SET s3_url_style='path';") + con.execute(f"CREATE SCHEMA IF NOT EXISTS {dataset}") + con.execute(f""" + CREATE OR REPLACE VIEW {dataset}.{table} AS + SELECT * FROM read_parquet('{s3_path}', hive_partitioning=true, union_by_name=true) + """) + con.close() + log.info("[DUCKDB] View created: %s.%s", dataset, table) + return True, None + except Exception as e: + log.error("[DUCKDB] Failed to create view %s.%s: %s", dataset, table, e) + return False, str(e) + + +def run_sync(targets, args, env, dry_run=False, resume=False): + """Run the sync for all target tables.""" + s3_client = get_s3_client(env) + bq_client = get_bq_client() + + # Load done/failed tracking + done_set = set() + if resume: + if os.path.exists(DONE_FILE): + with open(DONE_FILE) as f: + done_set = {l.strip() for l in f if l.strip()} + log.info("Resuming: %d tables already done", len(done_set)) + + failed_count = 0 + done_count = 0 + + # Filter out already-done tables + targets = [(tid, info) for tid, info in targets if tid not in done_set] + + if not targets: + log.info("No tables to sync.") + return 0, 0 + + log.info("Syncing %d tables...", len(targets)) + + for i, (table_id, info) in enumerate(targets, 1): + log.info("--- [%d/%d] Syncing %s ---", i, len(targets), table_id) + + # Sync BQ → S3 + ok, msg = sync_table( + (bq_client, s3_client, env["HETZNER_S3_BUCKET"]), + table_id, + info, + dry_run=dry_run, + ) + log.info(msg) + + if dry_run: + continue + + if not ok: + with open(FAILED_FILE, "a") as f: + f.write(f"{table_id}\t{msg}\n") + failed_count += 1 + continue + + if "empty" in msg.lower(): + continue + + # Update DuckDB view + ok, err = update_duckdb_view(env, table_id, info) + if not ok: + with open(FAILED_FILE, "a") as f: + f.write(f"{table_id}\tDUCKDB: {err}\n") + + # Mark done + with open(DONE_FILE, "a") as f: + f.write(f"{table_id}\n") + done_count += 1 + + return done_count, failed_count + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser(description="Sync missing BQ tables to S3") + parser.add_argument("--dry-run", action="store_true", help="List tables without syncing") + parser.add_argument("--resume", action="store_true", help="Resume from last run") + args = parser.parse_args() + + env = load_env() + dry_run = args.dry_run + + if dry_run: + log.info("=== DRY RUN MODE ===") + + # Step 1: List BigQuery tables + bq_client = get_bq_client() + bq_tables = list_bq_tables(bq_client) + + # Step 2: List S3 tables + s3_client = get_s3_client(env) + s3_tables = list_s3_tables(s3_client, env["HETZNER_S3_BUCKET"]) + + # Step 3: Parse missing tables from MD + md_missing = parse_missing_tables_from_md(MISSING_TABLES_FILE) + + # Step 4: Compute targets + targets = compute_missing_tables(bq_tables, s3_tables, md_missing) + + if not targets: + log.info("No tables to sync.") + return + + log.info("") + log.info("============================================") + log.info(" Tables to sync: %d", len(targets)) + log.info("============================================") + for i, (table_id, info) in enumerate(targets, 1): + size_mb = estimate_size_mb(info.get("num_bytes")) + md_note = md_missing.get(table_id, "") + log.info(" [%d] %-50s %6s MB %s", i, table_id, size_mb, md_note) + log.info("") + + if dry_run: + total_bytes = sum(info.get("num_bytes", 0) or 0 for _, info in targets) + total_gb = total_bytes / 1_073_741_824 + log.info("Total estimated size: %.2f GB (BigQuery compressed bytes)", total_gb) + log.info("Run without --dry-run to start syncing.") + return + + # Step 5: Run sync + log.info("Starting sync...") + done_count, failed_count = run_sync(targets, None, env, dry_run=False, resume=args.resume) + + log.info("") + log.info("============================================") + log.info(" Sync complete!") + log.info(" Done: %d tables", done_count) + log.info(" Failed: %d tables", failed_count) + log.info(" Log: %s", LOG_FILE) + log.info("============================================") + + if failed_count > 0: + log.info("Failed tables: see %s", FAILED_FILE) + sys.exit(1) + + +if __name__ == "__main__": + main()