refactor: reorganize project structure and fix broken references
- Move scripts to scripts/ directory (roda.sh, prepara_db.py, etc.) - Move shell config to shell/ directory (Caddyfile, auth.py, haloy.yml) - Move basedosdados.duckdb to data/ directory - Update Dockerfile and start.sh with new file paths - Update README.md with correct script paths - Remove Python ask.py (replaced by Rust binary in ask/ask) - Add Rust source files (schema_filter.rs, sql_generator.rs, table_selector.rs) - Remove sentence-transformer dependencies from ask - Move docs and context artifacts to their directories
This commit is contained in:
42
scripts/build_ask.sh
Executable file
42
scripts/build_ask.sh
Executable file
@@ -0,0 +1,42 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
echo "=== Building ask binary for Linux x86_64 ==="
|
||||
echo "Using Debian x86_64 container for native build..."
|
||||
|
||||
# Build in an x86_64 Debian container - this gives us a real x86_64 environment
|
||||
# so we can build natively without cross-compilation complexity
|
||||
# Use ask/ as context to avoid .dockerignore excluding src/
|
||||
docker build \
|
||||
--platform linux/amd64 \
|
||||
-t ask-builder \
|
||||
--build-arg BUILDKIT_INLINE_CACHE=1 \
|
||||
-f - ask/ <<'EOF'
|
||||
FROM rust:1.85-slim
|
||||
|
||||
RUN apt-get update -qq && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
build-essential pkg-config libssl-dev && \
|
||||
apt-get clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /build
|
||||
|
||||
COPY . ./
|
||||
RUN cargo build --release --locked
|
||||
|
||||
FROM scratch
|
||||
COPY --from=0 /build/target/release/ask /ask
|
||||
EOF
|
||||
|
||||
echo "=== Extracting binary ==="
|
||||
# Extract the binary from the container
|
||||
docker run --rm --platform linux/amd64 ask-builder cat /ask > ./ask/target/release/ask
|
||||
|
||||
# Make it executable
|
||||
chmod +x ./ask/target/release/ask
|
||||
|
||||
echo "=== Binary built successfully ==="
|
||||
file ./ask/target/release/ask
|
||||
ls -lh ./ask/target/release/ask
|
||||
6
scripts/failed_tables.txt
Normal file
6
scripts/failed_tables.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
[ACCESS_DENIED] br_bcb_taxa_cambio.taxa_cambio
|
||||
[ACCESS_DENIED] br_bcb_taxa_selic.taxa_selic
|
||||
[ACCESS_DENIED] br_bcb_taxa_cambio.taxa_cambio
|
||||
[ACCESS_DENIED] br_bcb_taxa_selic.taxa_selic
|
||||
[ACCESS_DENIED] br_bcb_taxa_cambio.taxa_cambio
|
||||
[ACCESS_DENIED] br_bcb_taxa_selic.taxa_selic
|
||||
268
scripts/gera_schemas.py
Normal file
268
scripts/gera_schemas.py
Normal file
@@ -0,0 +1,268 @@
|
||||
import os
|
||||
import json
|
||||
import sys
|
||||
import pyarrow.parquet as pq
|
||||
import s3fs
|
||||
import boto3
|
||||
import duckdb
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
S3_ENDPOINT = os.environ["HETZNER_S3_ENDPOINT"]
|
||||
S3_BUCKET = os.environ["HETZNER_S3_BUCKET"]
|
||||
ACCESS_KEY = os.environ["AWS_ACCESS_KEY_ID"]
|
||||
SECRET_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
|
||||
|
||||
s3_host = S3_ENDPOINT.removeprefix("https://").removeprefix("http://")
|
||||
|
||||
# --- boto3 client (listing only, zero egress) ---
|
||||
boto = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=S3_ENDPOINT,
|
||||
aws_access_key_id=ACCESS_KEY,
|
||||
aws_secret_access_key=SECRET_KEY,
|
||||
)
|
||||
|
||||
# --- s3fs filesystem (footer-only reads via pyarrow) ---
|
||||
fs = s3fs.S3FileSystem(
|
||||
client_kwargs={"endpoint_url": S3_ENDPOINT},
|
||||
key=ACCESS_KEY,
|
||||
secret=SECRET_KEY,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Phase 1: File inventory via S3 List API (zero data egress)
|
||||
# ------------------------------------------------------------------ #
|
||||
print("Phase 1: listing S3 objects...")
|
||||
paginator = boto.get_paginator("list_objects_v2")
|
||||
|
||||
inventory = {} # "dataset/table" -> {files: [...], total_size: int}
|
||||
|
||||
for page in paginator.paginate(Bucket=S3_BUCKET):
|
||||
for obj in page.get("Contents", []):
|
||||
key = obj["Key"]
|
||||
if not key.endswith(".parquet"):
|
||||
continue
|
||||
parts = key.split("/")
|
||||
if len(parts) < 3:
|
||||
continue
|
||||
dataset, table = parts[0], parts[1]
|
||||
dt = f"{dataset}/{table}"
|
||||
if dt not in inventory:
|
||||
inventory[dt] = {"files": [], "total_size_bytes": 0}
|
||||
inventory[dt]["files"].append(key)
|
||||
inventory[dt]["total_size_bytes"] += obj["Size"]
|
||||
|
||||
print(f" Found {len(inventory)} tables across {S3_BUCKET}")
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Phase 2: Schema reads — footer only (~30 KB per table)
|
||||
# ------------------------------------------------------------------ #
|
||||
print("Phase 2: reading parquet footers...")
|
||||
|
||||
def fmt_size(b):
|
||||
for unit in ("B", "KB", "MB", "GB", "TB"):
|
||||
if b < 1024 or unit == "TB":
|
||||
return f"{b:.1f} {unit}"
|
||||
b /= 1024
|
||||
|
||||
def extract_col_descriptions(schema):
|
||||
"""Try to pull per-column descriptions from Arrow metadata."""
|
||||
descriptions = {}
|
||||
meta = schema.metadata or {}
|
||||
# BigQuery exports embed a JSON blob under b'pandas' with column_info
|
||||
pandas_meta_raw = meta.get(b"pandas") or meta.get(b"pandas_metadata")
|
||||
if pandas_meta_raw:
|
||||
try:
|
||||
pm = json.loads(pandas_meta_raw)
|
||||
for col in pm.get("columns", []):
|
||||
name = col.get("name")
|
||||
desc = col.get("metadata", {}) or {}
|
||||
if isinstance(desc, dict) and "description" in desc:
|
||||
descriptions[name] = desc["description"]
|
||||
except Exception:
|
||||
pass
|
||||
# Also try top-level b'description' or b'schema'
|
||||
for key in (b"description", b"schema", b"BigQuery:description"):
|
||||
val = meta.get(key)
|
||||
if val:
|
||||
try:
|
||||
descriptions["__table__"] = val.decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
pass
|
||||
return descriptions
|
||||
|
||||
schemas = {}
|
||||
errors = []
|
||||
|
||||
for i, (dt, info) in enumerate(sorted(inventory.items())):
|
||||
dataset, table = dt.split("/", 1)
|
||||
first_file = info["files"][0]
|
||||
s3_path = f"{S3_BUCKET}/{first_file}"
|
||||
try:
|
||||
schema = pq.read_schema(fs.open(s3_path))
|
||||
col_descs = extract_col_descriptions(schema)
|
||||
|
||||
# Build raw metadata dict (decode bytes keys/values)
|
||||
raw_meta = {}
|
||||
if schema.metadata:
|
||||
for k, v in schema.metadata.items():
|
||||
try:
|
||||
dk = k.decode("utf-8", errors="replace")
|
||||
dv = v.decode("utf-8", errors="replace")
|
||||
# Try to parse JSON values
|
||||
try:
|
||||
dv = json.loads(dv)
|
||||
except Exception:
|
||||
pass
|
||||
raw_meta[dk] = dv
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
columns = []
|
||||
for field in schema:
|
||||
col = {
|
||||
"name": field.name,
|
||||
"type": str(field.type),
|
||||
"nullable": field.nullable,
|
||||
}
|
||||
if field.name in col_descs:
|
||||
col["description"] = col_descs[field.name]
|
||||
# Check field-level metadata
|
||||
if field.metadata:
|
||||
for k, v in field.metadata.items():
|
||||
try:
|
||||
dk = k.decode("utf-8", errors="replace")
|
||||
dv = v.decode("utf-8", errors="replace")
|
||||
if dk in ("description", "DESCRIPTION", "comment"):
|
||||
col["description"] = dv
|
||||
except Exception:
|
||||
pass
|
||||
columns.append(col)
|
||||
|
||||
schemas[f"{dataset}.{table}"] = {
|
||||
"path": f"s3://{S3_BUCKET}/{dataset}/{table}/",
|
||||
"file_count": len(info["files"]),
|
||||
"total_size_bytes": info["total_size_bytes"],
|
||||
"total_size_human": fmt_size(info["total_size_bytes"]),
|
||||
"columns": columns,
|
||||
"metadata": raw_meta,
|
||||
}
|
||||
print(f" [{i+1}/{len(inventory)}] ✓ {dataset}.{table} ({len(columns)} cols, {fmt_size(info['total_size_bytes'])})")
|
||||
except Exception as e:
|
||||
errors.append({"table": f"{dataset}.{table}", "error": str(e)})
|
||||
print(f" [{i+1}/{len(inventory)}] ✗ {dataset}.{table}: {e}", file=sys.stderr)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Phase 3: Enrich from br_bd_metadados.bigquery_tables (small table)
|
||||
# ------------------------------------------------------------------ #
|
||||
META_TABLE = "br_bd_metadados.bigquery_tables"
|
||||
meta_dt = "br_bd_metadados/bigquery_tables"
|
||||
|
||||
if meta_dt in inventory:
|
||||
print(f"Phase 3: enriching from {META_TABLE}...")
|
||||
try:
|
||||
con = duckdb.connect()
|
||||
con.execute("INSTALL httpfs; LOAD httpfs;")
|
||||
con.execute(f"""
|
||||
SET s3_endpoint='{s3_host}';
|
||||
SET s3_access_key_id='{ACCESS_KEY}';
|
||||
SET s3_secret_access_key='{SECRET_KEY}';
|
||||
SET s3_url_style='path';
|
||||
""")
|
||||
meta_path = f"s3://{S3_BUCKET}/br_bd_metadados/bigquery_tables/*.parquet"
|
||||
# Peek at available columns
|
||||
available = [r[0] for r in con.execute(f"DESCRIBE SELECT * FROM '{meta_path}' LIMIT 1").fetchall()]
|
||||
print(f" Metadata columns: {available}")
|
||||
|
||||
# Try to find dataset/table description columns
|
||||
desc_col = next((c for c in available if "description" in c.lower()), None)
|
||||
ds_col = next((c for c in available if c.lower() in ("dataset_id", "dataset", "schema_name")), None)
|
||||
tbl_col = next((c for c in available if c.lower() in ("table_id", "table_name", "table")), None)
|
||||
|
||||
if desc_col and ds_col and tbl_col:
|
||||
rows = con.execute(f"""
|
||||
SELECT {ds_col}, {tbl_col}, {desc_col}
|
||||
FROM '{meta_path}'
|
||||
""").fetchall()
|
||||
for ds, tbl, desc in rows:
|
||||
key = f"{ds}.{tbl}"
|
||||
if key in schemas and desc:
|
||||
schemas[key]["table_description"] = desc
|
||||
print(f" Enriched {len(rows)} table descriptions")
|
||||
else:
|
||||
print(f" Could not find expected columns (dataset_id, table_id, description) — skipping enrichment")
|
||||
con.close()
|
||||
except Exception as e:
|
||||
print(f" Enrichment failed: {e}", file=sys.stderr)
|
||||
else:
|
||||
print("Phase 3: br_bd_metadados.bigquery_tables not in S3 — skipping enrichment")
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Phase 4a: Write schemas.json
|
||||
# ------------------------------------------------------------------ #
|
||||
print("Phase 4: writing outputs...")
|
||||
|
||||
output = {
|
||||
"_meta": {
|
||||
"bucket": S3_BUCKET,
|
||||
"total_tables": len(schemas),
|
||||
"total_size_bytes": sum(v["total_size_bytes"] for v in schemas.values()),
|
||||
"total_size_human": fmt_size(sum(v["total_size_bytes"] for v in schemas.values())),
|
||||
"errors": errors,
|
||||
},
|
||||
"tables": dict(sorted(schemas.items())),
|
||||
}
|
||||
|
||||
with open("schemas.json", "w", encoding="utf-8") as f:
|
||||
json.dump(output, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f" ✓ schemas.json ({len(schemas)} tables)")
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Phase 4b: Write file_tree.md
|
||||
# ------------------------------------------------------------------ #
|
||||
lines = [
|
||||
f"# S3 File Tree: {S3_BUCKET}",
|
||||
"",
|
||||
]
|
||||
|
||||
# Group by dataset
|
||||
datasets_map = {}
|
||||
for dt_key, info in sorted(inventory.items()):
|
||||
dataset, table = dt_key.split("/", 1)
|
||||
datasets_map.setdefault(dataset, []).append((table, info))
|
||||
|
||||
total_files = sum(len(v["files"]) for v in inventory.values())
|
||||
total_bytes = sum(v["total_size_bytes"] for v in inventory.values())
|
||||
|
||||
for dataset, tables in sorted(datasets_map.items()):
|
||||
ds_bytes = sum(i["total_size_bytes"] for _, i in tables)
|
||||
ds_files = sum(len(i["files"]) for _, i in tables)
|
||||
lines.append(f"## {dataset}/ ({len(tables)} tables, {fmt_size(ds_bytes)}, {ds_files} files)")
|
||||
lines.append("")
|
||||
for table, info in sorted(tables):
|
||||
schema_entry = schemas.get(f"{dataset}.{table}", {})
|
||||
ncols = len(schema_entry.get("columns", []))
|
||||
col_str = f", {ncols} cols" if ncols else ""
|
||||
table_desc = schema_entry.get("table_description", "")
|
||||
desc_str = f" — {table_desc}" if table_desc else ""
|
||||
lines.append(f" - **{table}/** ({len(info['files'])} files, {fmt_size(info['total_size_bytes'])}{col_str}){desc_str}")
|
||||
lines.append("")
|
||||
|
||||
lines += [
|
||||
"---",
|
||||
f"**Total: {len(inventory)} tables · {fmt_size(total_bytes)} · {total_files} parquet files**",
|
||||
]
|
||||
|
||||
with open("file_tree.md", "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(lines) + "\n")
|
||||
|
||||
print(f" ✓ file_tree.md ({len(inventory)} tables)")
|
||||
print()
|
||||
print("Done!")
|
||||
print(f" schemas.json — full column-level schema dump")
|
||||
print(f" file_tree.md — bucket tree with sizes")
|
||||
if errors:
|
||||
print(f" {len(errors)} tables failed (see schemas.json _meta.errors)")
|
||||
69
scripts/prepara_db.py
Normal file
69
scripts/prepara_db.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import os
|
||||
import duckdb
|
||||
import boto3
|
||||
from collections import defaultdict
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
BUCKET = os.environ['HETZNER_S3_BUCKET']
|
||||
ENDPOINT_URL = os.environ['HETZNER_S3_ENDPOINT']
|
||||
ACCESS_KEY = os.environ['AWS_ACCESS_KEY_ID']
|
||||
SECRET_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
|
||||
|
||||
# DuckDB expects the endpoint without scheme
|
||||
s3_endpoint = ENDPOINT_URL.removeprefix('https://').removeprefix('http://')
|
||||
|
||||
# Lista todos os objetos do bucket de uma vez, agrupando por dataset/tabela
|
||||
s3 = boto3.client('s3',
|
||||
endpoint_url=ENDPOINT_URL,
|
||||
aws_access_key_id=ACCESS_KEY,
|
||||
aws_secret_access_key=SECRET_KEY)
|
||||
paginator = s3.get_paginator('list_objects_v2')
|
||||
|
||||
table_files = defaultdict(lambda: defaultdict(list))
|
||||
for page in paginator.paginate(Bucket=BUCKET):
|
||||
for obj in page.get('Contents', []):
|
||||
key = obj['Key']
|
||||
if not key.endswith('.parquet'):
|
||||
continue
|
||||
parts = key.split('/')
|
||||
if len(parts) >= 3:
|
||||
dataset, table = parts[0], parts[1]
|
||||
table_files[dataset][table].append(f"s3://{BUCKET}/{key}")
|
||||
|
||||
# Cria conexão DuckDB e configura S3
|
||||
con = duckdb.connect('basedosdados.duckdb')
|
||||
con.execute("INSTALL httpfs; LOAD httpfs;")
|
||||
con.execute(f"""
|
||||
SET s3_endpoint='{s3_endpoint}';
|
||||
SET s3_access_key_id='{ACCESS_KEY}';
|
||||
SET s3_secret_access_key='{SECRET_KEY}';
|
||||
SET s3_url_style='path';
|
||||
SET enable_object_cache=true;
|
||||
SET threads=4;
|
||||
SET memory_limit='6GB';
|
||||
SET preserve_insertion_order=false;
|
||||
SET http_keep_alive=true;
|
||||
SET http_retries=3;
|
||||
""")
|
||||
|
||||
# Cria schemas e views com lista explícita de arquivos
|
||||
for dataset, tables in table_files.items():
|
||||
con.execute(f"CREATE SCHEMA IF NOT EXISTS {dataset}")
|
||||
for table, files in tables.items():
|
||||
file_list = ", ".join(f"'{f}'" for f in sorted(files))
|
||||
try:
|
||||
con.execute(f"""
|
||||
CREATE OR REPLACE VIEW {dataset}.{table} AS
|
||||
SELECT * FROM read_parquet([{file_list}], hive_partitioning=true, union_by_name=true)
|
||||
""")
|
||||
print(f"✓ {dataset}.{table} ({len(files)} files)")
|
||||
except Exception as e:
|
||||
if 'Geoparquet' in str(e) or 'geometria' in str(e) or 'geometry' in str(e).lower():
|
||||
print(f" skip (geoparquet) {dataset}.{table}")
|
||||
else:
|
||||
raise
|
||||
|
||||
con.close()
|
||||
print("Done! Open with: duckdb --ui basedosdados.duckdb")
|
||||
706
scripts/roda.sh
Executable file
706
scripts/roda.sh
Executable file
@@ -0,0 +1,706 @@
|
||||
#!/usr/bin/env bash
|
||||
# =============================================================================
|
||||
# export_basedosdados.sh
|
||||
# Exports all basedosdados BigQuery tables → GCS (Parquet+zstd) → Hetzner Object Storage
|
||||
#
|
||||
# Prerequisites (run once before this script):
|
||||
# gcloud auth login
|
||||
# gcloud auth application-default login
|
||||
# gcloud config set project YOUR_PROJECT_ID
|
||||
# cp .env.example .env # then fill in your values
|
||||
#
|
||||
# Usage:
|
||||
# chmod +x export_basedosdados.sh
|
||||
# ./export_basedosdados.sh # full run (locally)
|
||||
# ./export_basedosdados.sh --dry-run # list tables + estimated sizes, no export
|
||||
# ./export_basedosdados.sh --gcloud-run # create GCP VM → run there → delete VM
|
||||
# =============================================================================
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
# Add util-linux to PATH on macOS (provides flock)
|
||||
[[ -d "/opt/homebrew/opt/util-linux/bin" ]] && export PATH="/opt/homebrew/opt/util-linux/bin:$PATH"
|
||||
|
||||
# Load .env if present
|
||||
if [[ -f "$(dirname "$0")/.env" ]]; then
|
||||
set -a
|
||||
# shellcheck source=.env
|
||||
source "$(dirname "$0")/.env"
|
||||
set +a
|
||||
fi
|
||||
|
||||
DRY_RUN=false
|
||||
GCLOUD_RUN=false
|
||||
SYNC_RUN=false
|
||||
if [[ "${1:-}" == "--dry-run" ]]; then
|
||||
DRY_RUN=true
|
||||
elif [[ "${1:-}" == "--gcloud-run" ]]; then
|
||||
GCLOUD_RUN=true
|
||||
elif [[ "${1:-}" == "--sync" ]]; then
|
||||
SYNC_RUN=true
|
||||
fi
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# LOGGING
|
||||
# -----------------------------------------------------------------------------
|
||||
LOG_FILE="export_$(date +%Y%m%d_%H%M%S).log"
|
||||
FAILED_FILE="failed_tables.txt"
|
||||
DONE_FILE="done_tables.txt"
|
||||
DONE_TRANSFERS_FILE="done_transfers.txt"
|
||||
|
||||
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"; }
|
||||
log_err() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $*" | tee -a "$LOG_FILE" >&2; }
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# STEP 0 — Verify dependencies
|
||||
# -----------------------------------------------------------------------------
|
||||
log "Checking dependencies..."
|
||||
if $GCLOUD_RUN; then
|
||||
for cmd in gcloud; do
|
||||
if ! command -v "$cmd" &>/dev/null; then
|
||||
log_err "'$cmd' not found. Install google-cloud-sdk."
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
elif ! $SYNC_RUN; then
|
||||
# Only require heavy GCP tools for the main export (not for --sync)
|
||||
for cmd in bq gcloud gsutil parallel rclone flock; do
|
||||
if ! command -v "$cmd" &>/dev/null; then
|
||||
log_err "'$cmd' not found. Install google-cloud-sdk, GNU parallel, and rclone."
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
# Validate S3 credentials
|
||||
if [[ -z "${AWS_ACCESS_KEY_ID:-}" || -z "${AWS_SECRET_ACCESS_KEY:-}" ]]; then
|
||||
log_err "Credenciais S3 não encontradas. Preencha o .env com AWS_ACCESS_KEY_ID e AWS_SECRET_ACCESS_KEY."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Validate GCP project (needed for --sync)
|
||||
if [[ -z "${GCP_PROJECT:-}" ]]; then
|
||||
if $SYNC_RUN; then
|
||||
if [[ -z "${YOUR_PROJECT:-}" ]]; then
|
||||
log_err "GCP_PROJECT não encontrado no .env. Adicione GCP_PROJECT ou YOUR_PROJECT."
|
||||
exit 1
|
||||
fi
|
||||
log "GCP_PROJECT not set, using YOUR_PROJECT: $YOUR_PROJECT"
|
||||
export GCP_PROJECT="$YOUR_PROJECT"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Configure rclone remotes via env vars — no rclone.conf or inline credentials needed.
|
||||
# GCS remote (bd:) uses Application Default Credentials from gcloud auth application-default login.
|
||||
# Hetzner S3 remote (hz:) uses the credentials from .env, kept out of the process command line.
|
||||
export RCLONE_CONFIG_BD_TYPE="google cloud storage"
|
||||
export RCLONE_CONFIG_BD_BUCKET_POLICY_ONLY="true"
|
||||
export RCLONE_CONFIG_HZ_TYPE="s3"
|
||||
export RCLONE_CONFIG_HZ_PROVIDER="Other"
|
||||
export RCLONE_CONFIG_HZ_ENDPOINT="$HETZNER_S3_ENDPOINT"
|
||||
export RCLONE_CONFIG_HZ_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID"
|
||||
export RCLONE_CONFIG_HZ_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY"
|
||||
|
||||
# =============================================================================
|
||||
# GCLOUD RUN — create a Compute Engine VM, run the export there, then clean up
|
||||
# =============================================================================
|
||||
if $GCLOUD_RUN; then
|
||||
VM_NAME="${GCP_VM_NAME:-bd-export-vm}"
|
||||
VM_ZONE="${GCP_VM_ZONE:-us-central1-a}"
|
||||
SCRIPT_PATH="$(realpath "$0")"
|
||||
ENV_PATH="$(dirname "$SCRIPT_PATH")/.env"
|
||||
|
||||
log "=============================="
|
||||
log " GCLOUD RUN MODE"
|
||||
log "=============================="
|
||||
|
||||
# ── Step 1/4: Create instance ───────────────────────────────────────────
|
||||
log "[1/4] Creating VM: $VM_NAME ($VM_ZONE) ..."
|
||||
if gcloud compute instances describe "$VM_NAME" \
|
||||
--zone="$VM_ZONE" --project="$YOUR_PROJECT" &>/dev/null; then
|
||||
log " VM already exists, reusing it."
|
||||
else
|
||||
gcloud compute instances create "$VM_NAME" \
|
||||
--project="$YOUR_PROJECT" \
|
||||
--zone="$VM_ZONE" \
|
||||
--machine-type=e2-standard-4 \
|
||||
--image-family=debian-12 \
|
||||
--image-project=debian-cloud \
|
||||
--boot-disk-size=20GB \
|
||||
--scopes=cloud-platform
|
||||
log " VM created."
|
||||
fi
|
||||
|
||||
# ── Step 2/4: Wait for SSH + copy files ────────────────────────────────
|
||||
log "[2/4] Waiting for SSH and copying files..."
|
||||
for i in {1..18}; do
|
||||
if gcloud compute ssh "$VM_NAME" \
|
||||
--zone="$VM_ZONE" --project="$YOUR_PROJECT" \
|
||||
--command="echo ready" 2>/dev/null; then
|
||||
break
|
||||
fi
|
||||
log " SSH not ready yet ($i/18), retrying in 10s..."
|
||||
sleep 10
|
||||
done
|
||||
|
||||
gcloud compute scp "$SCRIPT_PATH" "$ENV_PATH" \
|
||||
"$VM_NAME:~/" \
|
||||
--zone="$VM_ZONE" \
|
||||
--project="$YOUR_PROJECT"
|
||||
log " Files copied."
|
||||
|
||||
# ── Step 3/4: Install dependencies ─────────────────────────────────────
|
||||
log "[3/4] Installing dependencies on VM (~2 min)..."
|
||||
gcloud compute ssh "$VM_NAME" \
|
||||
--zone="$VM_ZONE" \
|
||||
--project="$YOUR_PROJECT" \
|
||||
--command="bash -s" <<'REMOTE_SETUP'
|
||||
set -euo pipefail
|
||||
export DEBIAN_FRONTEND=noninteractive
|
||||
sudo apt-get update -qq
|
||||
sudo apt-get install -y apt-transport-https ca-certificates gnupg curl parallel rclone
|
||||
curl -fsSL https://packages.cloud.google.com/apt/doc/apt-key.gpg \
|
||||
| sudo gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg
|
||||
echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" \
|
||||
| sudo tee /etc/apt/sources.list.d/google-cloud-sdk.list >/dev/null
|
||||
sudo apt-get update -qq
|
||||
sudo apt-get install -y google-cloud-cli
|
||||
chmod +x ~/roda.sh
|
||||
echo "Dependencies installed."
|
||||
REMOTE_SETUP
|
||||
log " Dependencies ready."
|
||||
|
||||
# ── Step 4/4: Run the export script interactively ──────────────────────
|
||||
log "[4/4] Launching roda.sh on VM — answer prompts as they appear."
|
||||
gcloud compute ssh "$VM_NAME" \
|
||||
--zone="$VM_ZONE" \
|
||||
--project="$YOUR_PROJECT" \
|
||||
-- bash ~/roda.sh
|
||||
|
||||
# ── Cleanup: Delete VM ──────────────────────────────────────────────────
|
||||
echo ""
|
||||
echo "============================================================"
|
||||
echo " CLEANUP"
|
||||
echo "============================================================"
|
||||
read -rp "Delete VM instance $VM_NAME? [y/N] " del_vm
|
||||
if [[ "$del_vm" =~ ^[Yy]$ ]]; then
|
||||
log "Deleting VM $VM_NAME ..."
|
||||
gcloud compute instances delete "$VM_NAME" \
|
||||
--zone="$VM_ZONE" \
|
||||
--project="$YOUR_PROJECT" \
|
||||
--quiet
|
||||
log "VM deleted."
|
||||
else
|
||||
log "VM kept. To delete manually:"
|
||||
log " gcloud compute instances delete $VM_NAME --zone=$VM_ZONE --project=$YOUR_PROJECT"
|
||||
fi
|
||||
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# =============================================================================
|
||||
# VM EXPORT — use existing bd-export-vm to export specific tables to GCS → S3
|
||||
# =============================================================================
|
||||
if [[ "${1:-}" == "--vm-export" ]]; then
|
||||
VM_NAME="${GCP_VM_NAME:-bd-export-vm}"
|
||||
VM_ZONE="${GCP_VM_ZONE:-us-central1-a}"
|
||||
VM_PROJECT="${GCP_VM_PROJECT:-raspa-491716}"
|
||||
TABLE_LIST="${2:-missing_tables.txt}"
|
||||
|
||||
log "=============================="
|
||||
log " VM EXPORT MODE"
|
||||
log " VM: $VM_NAME ($VM_ZONE)"
|
||||
log " Tables: $TABLE_LIST"
|
||||
log "=============================="
|
||||
|
||||
if [[ ! -f "$TABLE_LIST" ]]; then
|
||||
log_err "Table list not found: $TABLE_LIST"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
log "[1/5] Syncing files to VM..."
|
||||
gcloud compute scp \
|
||||
"$(dirname "$0")/roda.sh" \
|
||||
"$(dirname "$0")/.env" \
|
||||
"$(realpath "$TABLE_LIST")" \
|
||||
"$VM_NAME:~/" \
|
||||
--zone="$VM_ZONE" \
|
||||
--project="$VM_PROJECT"
|
||||
|
||||
log "[2/5] Ensuring GCS bucket exists..."
|
||||
if ! gsutil ls "gs://$BUCKET_NAME" &>/dev/null; then
|
||||
gsutil mb -p "$VM_PROJECT" -l "$BUCKET_REGION" -b on "gs://$BUCKET_NAME"
|
||||
log " Bucket created: gs://$BUCKET_NAME"
|
||||
else
|
||||
log " Bucket already exists."
|
||||
fi
|
||||
|
||||
log "[3/5] Running export on VM (bq extract + rclone)..."
|
||||
gcloud compute ssh "$VM_NAME" \
|
||||
--zone="$VM_ZONE" \
|
||||
--project="$VM_PROJECT" \
|
||||
--command="bash -s" <<'REMOTE_EXPORT'
|
||||
set -euo pipefail
|
||||
export DEBIAN_FRONTEND=noninteractive
|
||||
cd ~
|
||||
set -a
|
||||
# shellcheck source=.env
|
||||
source .env
|
||||
set +a
|
||||
source ~/.bashrc 2>/dev/null || true
|
||||
|
||||
export RCLONE_CONFIG_BD_TYPE="google cloud storage"
|
||||
export RCLONE_CONFIG_BD_BUCKET_POLICY_ONLY="true"
|
||||
export RCLONE_CONFIG_HZ_TYPE="s3"
|
||||
export RCLONE_CONFIG_HZ_PROVIDER="Other"
|
||||
export RCLONE_CONFIG_HZ_ENDPOINT="$HETZNER_S3_ENDPOINT"
|
||||
export RCLONE_CONFIG_HZ_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID"
|
||||
export RCLONE_CONFIG_HZ_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY"
|
||||
|
||||
echo "[BQ EXTRACT] Starting export of missing tables..."
|
||||
|
||||
extract_table() {
|
||||
local table="$1"
|
||||
local dataset table_id gcs_prefix
|
||||
dataset=$(echo "$table" | cut -d. -f1)
|
||||
table_id=$(echo "$table" | cut -d. -f2)
|
||||
gcs_prefix="gs://$BUCKET_NAME/$dataset/$table_id"
|
||||
|
||||
echo "[EXTRACT] $table"
|
||||
bq extract \
|
||||
--project_id="$YOUR_PROJECT" \
|
||||
--destination_format=PARQUET \
|
||||
--compression=ZSTD \
|
||||
--location=US \
|
||||
"${SOURCE_PROJECT}:${dataset}.${table_id}" \
|
||||
"${gcs_prefix}/*.parquet" 2>&1 \
|
||||
|| echo "[FAIL] $table"
|
||||
}
|
||||
|
||||
export -f extract_table
|
||||
export BUCKET_NAME SOURCE_PROJECT
|
||||
|
||||
cat missing_tables.txt | parallel -j8 --bar extract_table {}
|
||||
|
||||
echo "[TRANSFER] GCS → Hetzner S3..."
|
||||
datasets=$(gsutil ls "gs://$BUCKET_NAME/" 2>/dev/null | sed 's|gs://[^/]*/||;s|/$||' | grep -v '^$' | sort -u)
|
||||
for ds in $datasets; do
|
||||
echo "[TRANSFER] $ds"
|
||||
rclone copy "bd:$BUCKET_NAME/$ds/" "hz:$HETZNER_S3_BUCKET/$ds/" \
|
||||
--transfers 32 --s3-upload-concurrency 32 --progress 2>&1 \
|
||||
|| echo "[FAIL_TRANSFER] $ds"
|
||||
done
|
||||
|
||||
echo "[DONE] Export complete."
|
||||
REMOTE_EXPORT
|
||||
|
||||
log "[4/5] Verifying transfer..."
|
||||
S3_COUNT=$(gcloud compute ssh "$VM_NAME" \
|
||||
--zone="$VM_ZONE" \
|
||||
--project="$VM_PROJECT" \
|
||||
--command="source .env && rclone ls hz:\$HETZNER_S3_BUCKET 2>/dev/null | grep -c '\.parquet\$' || echo 0" 2>/dev/null)
|
||||
log " S3 parquet files: $S3_COUNT"
|
||||
|
||||
log "[5/5] Cleaning up GCS bucket..."
|
||||
read -rp "Delete GCS bucket gs://$BUCKET_NAME? [y/N] " confirm
|
||||
if [[ "$confirm" =~ ^[Yy]$ ]]; then
|
||||
gsutil -m rm -r "gs://$BUCKET_NAME"
|
||||
gsutil rb "gs://$BUCKET_NAME"
|
||||
log " Bucket deleted."
|
||||
fi
|
||||
|
||||
log "VM export complete."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# =============================================================================
|
||||
# SYNC — BigQuery → S3 direct (no GCS intermediary)
|
||||
# =============================================================================
|
||||
if $SYNC_RUN; then
|
||||
log "=============================="
|
||||
log " SYNC MODE — BigQuery → S3"
|
||||
log "=============================="
|
||||
|
||||
# Check dependencies
|
||||
for cmd in python3; do
|
||||
if ! command -v "$cmd" &>/dev/null; then
|
||||
log_err "'$cmd' not found."
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# Check Python dependencies (import name vs pip package name differs)
|
||||
PYTHON_CHECKS="google.cloud.bigquery:boto3:pandas:pyarrow"
|
||||
for check in $(echo "$PYTHON_CHECKS" | tr ':' '\n'); do
|
||||
module="${check}"
|
||||
if ! python3 -c "import ${module}" 2>/dev/null; then
|
||||
pip_pkg="${module}"
|
||||
log_err "Missing Python package: ${pip_pkg}. Run: pip install google-cloud-bigquery boto3 pandas pyarrow"
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# Set GCP_PROJECT for the Python script
|
||||
export GCP_PROJECT="${GCP_PROJECT:-${YOUR_PROJECT}}"
|
||||
|
||||
log "GCP project: $GCP_PROJECT"
|
||||
log "S3 bucket: $HETZNER_S3_BUCKET"
|
||||
log "S3 endpoint: $HETZNER_S3_ENDPOINT"
|
||||
log ""
|
||||
|
||||
if $DRY_RUN; then
|
||||
log "DRY RUN — listing tables only, no data will be transferred"
|
||||
fi
|
||||
|
||||
# Run the sync script, filtering out --sync (roda.sh flag)
|
||||
SYNC_ARGS=()
|
||||
for arg in "$@"; do
|
||||
[[ "$arg" != "--sync" ]] && SYNC_ARGS+=("$arg")
|
||||
done
|
||||
python3 sync_bq_to_local.py "${SYNC_ARGS[@]+"${SYNC_ARGS[@]}"}"
|
||||
exit $?
|
||||
fi
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# STEP 1 — Create GCS bucket in US region (same as basedosdados)
|
||||
# -----------------------------------------------------------------------------
|
||||
if $DRY_RUN; then
|
||||
log "[DRY RUN] Would create GCS bucket: gs://$BUCKET_NAME in region $BUCKET_REGION"
|
||||
else
|
||||
log "Creating GCS bucket: gs://$BUCKET_NAME in region $BUCKET_REGION"
|
||||
if gsutil ls "gs://$BUCKET_NAME" &>/dev/null; then
|
||||
log "Bucket already exists, skipping creation."
|
||||
else
|
||||
gsutil mb \
|
||||
-p "$YOUR_PROJECT" \
|
||||
-l "$BUCKET_REGION" \
|
||||
-b on \
|
||||
"gs://$BUCKET_NAME"
|
||||
log "Bucket created: gs://$BUCKET_NAME"
|
||||
fi
|
||||
|
||||
fi
|
||||
|
||||
# Resume support: load already-done tables/transfers
|
||||
touch "$DONE_FILE" "$FAILED_FILE" "$DONE_TRANSFERS_FILE"
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# STEP 2 — Build the full table list from the basedosdados project
|
||||
#
|
||||
# We auto-discover all datasets and tables via the BQ API so we don't rely
|
||||
# on a hardcoded list. This also detects any new tables added since the
|
||||
# tables-summary.md was written.
|
||||
#
|
||||
# Atomicity: we write to a .tmp file and mv it into place only on success,
|
||||
# so an interrupted run never leaves a partial list behind.
|
||||
# -----------------------------------------------------------------------------
|
||||
log "Discovering all datasets in project: $SOURCE_PROJECT ..."
|
||||
TABLE_LIST_FILE="all_tables.txt"
|
||||
TABLE_LIST_TMP="${TABLE_LIST_FILE}.tmp"
|
||||
|
||||
if [[ ! -f "$TABLE_LIST_FILE" ]]; then
|
||||
bq ls --project_id="$SOURCE_PROJECT" --max_results=10000 --format=json 2>/dev/null \
|
||||
| python3 -c "
|
||||
import json, sys
|
||||
datasets = json.load(sys.stdin)
|
||||
for ds in datasets:
|
||||
print(ds['datasetReference']['datasetId'])
|
||||
" > /tmp/datasets.txt
|
||||
|
||||
log "Found $(wc -l < /tmp/datasets.txt) datasets. Listing tables in parallel..."
|
||||
|
||||
TMP_TABLE_DIR=$(mktemp -d)
|
||||
|
||||
list_dataset_tables() {
|
||||
local dataset="$1"
|
||||
local source="$2"
|
||||
local tmp_dir="$3"
|
||||
bq ls \
|
||||
--project_id="$source" \
|
||||
--dataset_id="$source:$dataset" \
|
||||
--max_results=10000 \
|
||||
--format=json 2>/dev/null \
|
||||
| python3 -c "
|
||||
import json, sys
|
||||
data = sys.stdin.read()
|
||||
if not data.strip():
|
||||
sys.exit(0)
|
||||
for t in json.loads(data):
|
||||
ref = t.get('tableReference', {})
|
||||
if t.get('type') in ('TABLE', 'EXTERNAL'):
|
||||
print(ref['datasetId'] + '.' + ref['tableId'])
|
||||
" > "$tmp_dir/$dataset.txt"
|
||||
}
|
||||
export -f list_dataset_tables
|
||||
|
||||
parallel --jobs 16 list_dataset_tables {} "$SOURCE_PROJECT" "$TMP_TABLE_DIR" < /tmp/datasets.txt
|
||||
|
||||
cat "$TMP_TABLE_DIR"/*.txt | sort > "$TABLE_LIST_TMP"
|
||||
rm -rf "$TMP_TABLE_DIR"
|
||||
mv "$TABLE_LIST_TMP" "$TABLE_LIST_FILE"
|
||||
log "Total tables discovered: $(wc -l < "$TABLE_LIST_FILE")"
|
||||
else
|
||||
log "Reusing existing table list: $TABLE_LIST_FILE ($(wc -l < "$TABLE_LIST_FILE") tables)"
|
||||
fi
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# DRY RUN — show table count and exit
|
||||
# -----------------------------------------------------------------------------
|
||||
if $DRY_RUN; then
|
||||
TOTAL=$(wc -l < "$TABLE_LIST_FILE")
|
||||
log "[DRY RUN] $TOTAL tables found. No exports will run."
|
||||
log "[DRY RUN] Estimating total size via bq show in parallel (this may take a while)..."
|
||||
|
||||
get_table_bytes() {
|
||||
local table="$1"
|
||||
local source="$2"
|
||||
local dataset table_id
|
||||
dataset=$(echo "$table" | cut -d. -f1)
|
||||
table_id=$(echo "$table" | cut -d. -f2)
|
||||
bq show --format=json "${source}:${dataset}.${table_id}" 2>/dev/null \
|
||||
| python3 -c "import json,sys; d=json.load(sys.stdin); print(d.get('numBytes','0'))" 2>/dev/null \
|
||||
|| echo 0
|
||||
}
|
||||
export -f get_table_bytes
|
||||
|
||||
TOTAL_BYTES=$(parallel --jobs 16 get_table_bytes {} "$SOURCE_PROJECT" < "$TABLE_LIST_FILE" \
|
||||
| awk '{s+=$1} END{print s+0}')
|
||||
|
||||
TOTAL_GB=$(echo "scale=2; $TOTAL_BYTES / 1073741824" | bc)
|
||||
# Parquet+zstd typically compresses structured data 5–10x vs BigQuery's raw numBytes
|
||||
COMPRESSED_LOW=$(echo "scale=2; $TOTAL_GB / 10" | bc)
|
||||
COMPRESSED_HIGH=$(echo "scale=2; $TOTAL_GB / 5" | bc)
|
||||
EGRESS_LOW=$(echo "scale=2; $COMPRESSED_LOW * 0.08" | bc)
|
||||
EGRESS_HIGH=$(echo "scale=2; $COMPRESSED_HIGH * 0.12" | bc)
|
||||
log "[DRY RUN] BigQuery raw size (uncompressed): ~${TOTAL_GB} GB"
|
||||
log "[DRY RUN] Estimated Parquet+zstd size: ~${COMPRESSED_LOW}–${COMPRESSED_HIGH} GB"
|
||||
log "[DRY RUN] Estimated GCS→Hetzner egress cost: USD ${EGRESS_LOW}–${EGRESS_HIGH}"
|
||||
log "[DRY RUN] Done. Remove --dry-run to start the actual export."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# COST WARNING — confirm before starting export
|
||||
# -----------------------------------------------------------------------------
|
||||
echo ""
|
||||
echo "============================================================"
|
||||
echo " COST WARNING"
|
||||
echo " Transferring data from GCS to Hetzner costs ~\$0.08-0.12/GB"
|
||||
echo " in internet egress fees charged to: $YOUR_PROJECT"
|
||||
echo " Run with --dry-run first to estimate the total size."
|
||||
echo "============================================================"
|
||||
echo ""
|
||||
read -rp "Press ENTER to start the export, or Ctrl+C to abort: "
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# STEP 3 — Export function (called in parallel)
|
||||
# -----------------------------------------------------------------------------
|
||||
export_table() {
|
||||
local table="$1"
|
||||
local bucket="$2"
|
||||
local project="$3"
|
||||
local source="$4"
|
||||
local done_file="$5"
|
||||
local failed_file="$6"
|
||||
local log_file="$7"
|
||||
|
||||
# Skip if already done
|
||||
if grep -qxF "$table" "$done_file" 2>/dev/null; then
|
||||
echo "[SKIP] $table (already exported)" >> "$log_file"
|
||||
return 0
|
||||
fi
|
||||
|
||||
local dataset table_id gcs_prefix
|
||||
dataset=$(echo "$table" | cut -d. -f1)
|
||||
table_id=$(echo "$table" | cut -d. -f2)
|
||||
gcs_prefix="gs://$bucket/$dataset/$table_id"
|
||||
|
||||
echo "[START] Exporting $source:$table → $gcs_prefix/*.parquet" >> "$log_file"
|
||||
|
||||
# Run bq extract with retry (up to 3 attempts)
|
||||
# Skip retries immediately if the error is a known incompatible type
|
||||
local attempt=0
|
||||
local success=false
|
||||
local output
|
||||
while [[ $attempt -lt 3 ]]; do
|
||||
attempt=$((attempt + 1))
|
||||
output=$(bq extract \
|
||||
--project_id="$project" \
|
||||
--destination_format=PARQUET \
|
||||
--compression=ZSTD \
|
||||
--location=US \
|
||||
"${source}:${dataset}.${table_id}" \
|
||||
"${gcs_prefix}/*.parquet" \
|
||||
2>&1)
|
||||
local exit_code=$?
|
||||
echo "$output" >> "$log_file"
|
||||
|
||||
if [[ $exit_code -eq 0 ]]; then
|
||||
success=true
|
||||
break
|
||||
fi
|
||||
|
||||
# Detect permanently incompatible types — no point retrying
|
||||
if echo "$output" | grep -qi "not supported\|unsupported type\|GEOGRAPHY\|JSON type"; then
|
||||
echo "[SKIP_INCOMPATIBLE] $table — unsupported column type, skipping retries" >> "$log_file"
|
||||
flock "$failed_file" bash -c "echo '[INCOMPATIBLE] $table' >> '$failed_file'"
|
||||
return 0
|
||||
fi
|
||||
|
||||
# Detect access/permission errors — no point retrying
|
||||
if echo "$output" | grep -qi "access denied\|permission denied\|not authorized\|403\|does not exist\|Not found"; then
|
||||
echo "[SKIP_ACCESS] $table — access denied or not found, skipping retries" >> "$log_file"
|
||||
flock "$failed_file" bash -c "echo '[ACCESS_DENIED] $table' >> '$failed_file'"
|
||||
return 0
|
||||
fi
|
||||
|
||||
echo "[RETRY $attempt/3] $table" >> "$log_file"
|
||||
sleep $((attempt * 10))
|
||||
done
|
||||
|
||||
if $success; then
|
||||
# flock prevents race condition when multiple workers write concurrently
|
||||
flock "$done_file" bash -c "echo '$table' >> '$done_file'"
|
||||
echo "[DONE] $table" >> "$log_file"
|
||||
else
|
||||
flock "$failed_file" bash -c "echo '$table' >> '$failed_file'"
|
||||
echo "[FAIL] $table after 3 attempts" >> "$log_file"
|
||||
fi
|
||||
}
|
||||
|
||||
export -f export_table
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# STEP 4 — Run exports in parallel
|
||||
# -----------------------------------------------------------------------------
|
||||
log "Starting parallel exports ($PARALLEL_EXPORTS workers)..."
|
||||
log "Progress is logged to: $LOG_FILE"
|
||||
log "Failed tables will be written to: $FAILED_FILE"
|
||||
|
||||
# Filter out already-done tables
|
||||
comm -23 \
|
||||
<(sort "$TABLE_LIST_FILE") \
|
||||
<(sort "$DONE_FILE") \
|
||||
| parallel \
|
||||
--jobs "$PARALLEL_EXPORTS" \
|
||||
--progress \
|
||||
--bar \
|
||||
export_table {} "$BUCKET_NAME" "$YOUR_PROJECT" "$SOURCE_PROJECT" \
|
||||
"$DONE_FILE" "$FAILED_FILE" "$LOG_FILE" \
|
||||
|| true # failures are tracked in $FAILED_FILE; don't let parallel's exit code abort the script
|
||||
|
||||
TOTAL=$(wc -l < "$TABLE_LIST_FILE")
|
||||
DONE=$(wc -l < "$DONE_FILE")
|
||||
FAILED=$(wc -l < "$FAILED_FILE")
|
||||
log "Export phase complete: $DONE/$TOTAL done, $FAILED failed"
|
||||
|
||||
if [[ $FAILED -gt 0 ]]; then
|
||||
log "Failed tables:"
|
||||
cat "$FAILED_FILE" | tee -a "$LOG_FILE"
|
||||
log "To retry failed tables only, run: bash $0 --retry-failed"
|
||||
fi
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# STEP 5 — Transfer GCS → Hetzner Object Storage via rclone (no local staging)
|
||||
#
|
||||
# rclone streams data directly between GCS and S3 through RAM only —
|
||||
# no local disk required.
|
||||
# -----------------------------------------------------------------------------
|
||||
log "Starting transfer to Hetzner Object Storage ($HETZNER_S3_ENDPOINT)..."
|
||||
|
||||
TRANSFER_LOG_DIR=$(mktemp -d)
|
||||
|
||||
# Compute total datasets in GCS bucket once (used for progress display)
|
||||
TRANSFER_TOTAL=$(gsutil ls "gs://$BUCKET_NAME/" | wc -l)
|
||||
export TRANSFER_TOTAL
|
||||
|
||||
download_dataset() {
|
||||
local dataset="$1"
|
||||
local bucket="$2"
|
||||
local s3_bucket="$3"
|
||||
local s3_concurrency="$4"
|
||||
local done_transfers_file="$5"
|
||||
local log_dir="$6"
|
||||
local total="$7"
|
||||
local dataset_log="$log_dir/${dataset}.log"
|
||||
|
||||
# Resume: skip datasets already transferred
|
||||
if grep -qxF "$dataset" "$done_transfers_file" 2>/dev/null; then
|
||||
echo "[SKIP_TRANSFER] $dataset (already transferred)" > "$dataset_log"
|
||||
return 0
|
||||
fi
|
||||
|
||||
echo "[TRANSFER] gs://$bucket/$dataset/ → hz:$s3_bucket/$dataset/" > "$dataset_log"
|
||||
|
||||
# Named remotes bd: (GCS) and hz: (Hetzner S3) are configured via RCLONE_CONFIG_* env vars
|
||||
if rclone copy \
|
||||
"bd:$bucket/$dataset/" \
|
||||
"hz:$s3_bucket/$dataset/" \
|
||||
--transfers "$s3_concurrency" \
|
||||
--s3-upload-concurrency "$s3_concurrency" \
|
||||
--progress \
|
||||
>> "$dataset_log" 2>&1; then
|
||||
flock "$done_transfers_file" bash -c "echo '$dataset' >> '$done_transfers_file'"
|
||||
echo "[TRANSFERRED] $dataset" >> "$dataset_log"
|
||||
local done_count
|
||||
done_count=$(wc -l < "$done_transfers_file")
|
||||
local pct=$(( done_count * 100 / total ))
|
||||
echo "[${done_count}/${total}] ${pct}% datasets transferidos"
|
||||
else
|
||||
echo "[TRANSFER FAIL] rclone failed for $dataset" >> "$dataset_log"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
export -f download_dataset
|
||||
|
||||
# Get list of exported datasets, skipping already-transferred ones
|
||||
comm -23 \
|
||||
<(gsutil ls "gs://$BUCKET_NAME/" | sed 's|gs://[^/]*/||;s|/||' | sort -u) \
|
||||
<(sort "$DONE_TRANSFERS_FILE") \
|
||||
| parallel \
|
||||
--jobs "$PARALLEL_UPLOADS" \
|
||||
download_dataset {} "$BUCKET_NAME" "$HETZNER_S3_BUCKET" "$S3_CONCURRENCY" "$DONE_TRANSFERS_FILE" "$TRANSFER_LOG_DIR" "$TRANSFER_TOTAL" \
|
||||
|| true # failures are tracked per-dataset; don't abort
|
||||
|
||||
# Merge per-dataset logs into main log in order
|
||||
for f in $(ls "$TRANSFER_LOG_DIR"/*.log 2>/dev/null | sort); do
|
||||
cat "$f" >> "$LOG_FILE"
|
||||
done
|
||||
rm -rf "$TRANSFER_LOG_DIR"
|
||||
|
||||
log "Transfer complete."
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# STEP 6 — Verify file counts on Hetzner Object Storage vs GCS
|
||||
# -----------------------------------------------------------------------------
|
||||
log "Verifying file counts..."
|
||||
GCS_COUNT=$(gsutil ls -r "gs://$BUCKET_NAME/**" | grep '\.parquet$' | wc -l)
|
||||
S3_COUNT=$(rclone ls "hz:$HETZNER_S3_BUCKET" 2>/dev/null | grep '\.parquet$' | wc -l)
|
||||
|
||||
log "GCS parquet files: $GCS_COUNT"
|
||||
log "S3 parquet files: $S3_COUNT"
|
||||
|
||||
if [[ "$GCS_COUNT" -eq "$S3_COUNT" ]]; then
|
||||
log "File counts match. Transfer verified."
|
||||
else
|
||||
log_err "Count mismatch! GCS=$GCS_COUNT S3=$S3_COUNT"
|
||||
log_err "Re-run the script to resume failed datasets or check $LOG_FILE for errors."
|
||||
fi
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# STEP 7 — Clean up GCS bucket to stop storage charges
|
||||
# -----------------------------------------------------------------------------
|
||||
read -rp "Delete GCS bucket gs://$BUCKET_NAME to stop storage charges? [y/N] " confirm
|
||||
if [[ "$confirm" =~ ^[Yy]$ ]]; then
|
||||
log "Deleting bucket gs://$BUCKET_NAME ..."
|
||||
gsutil -m rm -r "gs://$BUCKET_NAME"
|
||||
gsutil rb "gs://$BUCKET_NAME"
|
||||
log "Bucket deleted. Storage charges stopped."
|
||||
else
|
||||
log "Bucket kept. Remember to delete it later: gsutil -m rm -r gs://$BUCKET_NAME && gsutil rb gs://$BUCKET_NAME"
|
||||
fi
|
||||
|
||||
log "All done! Data is at s3://$HETZNER_S3_BUCKET/ ($HETZNER_S3_ENDPOINT)"
|
||||
log "Total exported: $DONE tables | Failed: $FAILED tables"
|
||||
log "See $LOG_FILE for full details."
|
||||
62
scripts/sample_datasets.py
Normal file
62
scripts/sample_datasets.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import duckdb
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
BUCKET = os.environ['HETZNER_S3_BUCKET']
|
||||
ENDPOINT_URL = os.environ['HETZNER_S3_ENDPOINT']
|
||||
ACCESS_KEY = os.environ['AWS_ACCESS_KEY_ID']
|
||||
SECRET_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
|
||||
|
||||
s3_endpoint = ENDPOINT_URL.removeprefix('https://').removeprefix('http://')
|
||||
|
||||
con = duckdb.connect('basedosdados.duckdb')
|
||||
con.execute("LOAD httpfs;")
|
||||
con.execute(f"""
|
||||
SET s3_endpoint='{s3_endpoint}';
|
||||
SET s3_access_key_id='{ACCESS_KEY}';
|
||||
SET s3_secret_access_key='{SECRET_KEY}';
|
||||
SET s3_url_style='path';
|
||||
SET enable_object_cache=true;
|
||||
SET threads=4;
|
||||
SET memory_limit='6GB';
|
||||
""")
|
||||
|
||||
schemas = [row[0] for row in con.execute(
|
||||
"SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('main', 'information_schema', 'pg_catalog')"
|
||||
).fetchall()]
|
||||
|
||||
try:
|
||||
with open("dataset_sample.txt", "a") as f:
|
||||
f.write("# Dataset samples\n\n")
|
||||
|
||||
for schema in sorted(schemas):
|
||||
tables = [row[0] for row in con.execute(
|
||||
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{schema}'"
|
||||
).fetchall()]
|
||||
|
||||
for table in sorted(tables):
|
||||
full = f"{schema}.{table}"
|
||||
try:
|
||||
rows = con.execute(
|
||||
f"SELECT * FROM {full} USING SAMPLE 2 ROWS"
|
||||
).fetchall()
|
||||
cols = [f"{d[0]}:{d[1]}" for d in con.description]
|
||||
|
||||
f.write(f"## {schema}/{table}/\n")
|
||||
f.write(",".join(cols) + "\n")
|
||||
for row in rows:
|
||||
f.write(",".join("" if v is None else str(v) for v in row) + "\n")
|
||||
f.write("\n")
|
||||
f.flush()
|
||||
print(f"done: {full}")
|
||||
except Exception as e:
|
||||
f.write(f"## {schema}/{table}/\n[error: {e}]\n\n")
|
||||
f.flush()
|
||||
print(f"error: {full}: {e}")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nCancelled.")
|
||||
|
||||
con.close()
|
||||
Reference in New Issue
Block a user