diff --git a/README.md b/README.md index 97d4fed..774fb6f 100644 --- a/README.md +++ b/README.md @@ -1,100 +1,22 @@ # baseldosdados -Mirror completo das tabelas públicas do projeto [Base dos Dados](https://basedosdados.org/) — 533 tabelas, ~675 GB em Parquet+zstd — hospedado no Hetzner Object Storage e acessível via DuckDB no browser. +Mirror completo das tabelas públicas do projeto [Base dos Dados](https://basedosdados.org/) — 533 tabelas, ~675 GB em Parquet+zstd. -## Scripts +Os dados foram exportados do BigQuery para o Hetzner Object Storage (Helsinki) no formato Parquet com compressão zstd, organizados por dataset e tabela. O acesso é feito diretamente sobre os arquivos via DuckDB, sem necessidade de importar nada localmente — as queries leem os parquets do S3 sob demanda. -| Script | Função | -|---|---| -| `roda.sh` | Exporta BigQuery → GCS → Hetzner S3 (pipeline principal) | -| `prepara_db.py` | Cria `basedosdados.duckdb` com views sobre os parquets do S3 | +--- -## Fluxo de exportação +## Consultando os dados -``` -BigQuery (basedosdados) → GCS (Parquet + zstd) → Hetzner Object Storage (rclone) -``` - -1. Descobre automaticamente todos os datasets e tabelas via API do BigQuery -2. Exporta em paralelo no formato Parquet com compressão zstd -3. Transfere GCS → Hetzner Object Storage via rclone (streaming direto, sem disco local) -4. Verifica a contagem de arquivos entre GCS e S3 - -Resume automático: se interrompido, basta rodar novamente — tabelas e transfers já concluídos são pulados. - -## Estrutura dos dados no S3 - -``` -s3:/// -└── / - └── / - └── *.parquet -``` - -## Configuração - -Crie um arquivo `.env`: - -| Variável | Descrição | -|---|---| -| `YOUR_PROJECT` | ID do seu projeto GCP (para faturamento) | -| `BUCKET_NAME` | Nome do bucket GCS intermediário | -| `BUCKET_REGION` | Região do bucket S3 (ex: `eu-central`) | -| `SOURCE_PROJECT` | Projeto fonte (`basedosdados`) | -| `PARALLEL_EXPORTS` | Jobs paralelos de exportação BigQuery (padrão: 8) | -| `HETZNER_S3_BUCKET` | Nome do bucket no Hetzner Object Storage | -| `HETZNER_S3_ENDPOINT` | Endpoint do Hetzner (ex: `https://hel1.your-objectstorage.com`) | -| `S3_CONCURRENCY` | Transfers paralelos do rclone (padrão: 64) | -| `PARALLEL_UPLOADS` | Datasets enviados em paralelo (padrão: 4) | -| `AWS_ACCESS_KEY_ID` | Access key do Hetzner Object Storage | -| `AWS_SECRET_ACCESS_KEY` | Secret key do Hetzner Object Storage | -| `BASIC_AUTH_PASSWORD` | Senha de acesso ao shell web e endpoint `/query` | - -## Uso — exportação - -```bash -chmod +x roda.sh -./roda.sh --dry-run # estima tamanho e custo -./roda.sh # execução local -./roda.sh --gcloud-run # cria VM no GCP, roda lá e deleta ao final -``` - -Autenticação GCP necessária antes da primeira exportação: - -```bash -gcloud auth login -gcloud auth application-default login -gcloud config set project SEU_PROJECT_ID -``` - -### `--gcloud-run` - -Cria uma VM `e2-standard-4` Debian 12 em `us-central1-a`, copia o script e o `.env`, instala dependências e executa via SSH. - -| Variável | Padrão | Descrição | -|---|---|---| -| `GCP_VM_NAME` | `bd-export-vm` | Nome da instância | -| `GCP_VM_ZONE` | `us-central1-a` | Zona do Compute Engine | - -## Uso — exploração local - -```bash -python prepara_db.py # cria basedosdados.duckdb com views para todas as tabelas -duckdb basedosdados.duckdb -``` - -## Acesso remoto — https://db.xn--2dk.xyz - -Container Docker (Caddy + ttyd) com shell DuckDB acessível via browser ou curl, protegido por senha. +Acesso via browser ou curl, protegido por senha. Peça a senha para o administrador. ### Shell no browser -Acesse https://db.xn--2dk.xyz → autentique com a senha → shell DuckDB interativo direto no browser. +Acesse **https://db.xn--2dk.xyz** → autentique → shell DuckDB interativo direto no browser. ### SQL via curl -Endpoint `POST /query` — aceita SQL no body, retorna output como texto plano. -Autenticação via header `X-Password`. +Endpoint `POST /query` — SQL no body, resultado como texto plano: ```bash # Query inline @@ -118,17 +40,34 @@ GROUP BY 1 ORDER BY 2 DESC SQL -# Salvar resultado +# Salvar resultado em arquivo curl -s -X POST https://db.xn--2dk.xyz/query \ -H "X-Password: " \ --data-binary @query.sql > resultado.csv ``` -O DuckDB suporta saída em CSV e JSON nativamente: +### Descobrindo tabelas ```sql --- CSV com header -COPY (SELECT * FROM br_ibge_censo2022.municipios LIMIT 100) +-- listar todos os datasets (schemas) +SHOW SCHEMAS; + +-- listar tabelas de um dataset +SHOW TABLES IN br_anatel_banda_larga_fixa; + +-- ver colunas de uma tabela +DESCRIBE br_anatel_banda_larga_fixa.densidade_brasil; +``` + +No shell do browser, `.tables` lista tudo de uma vez. + +### Exportar em CSV ou JSON + +O DuckDB permite formatar a saída diretamente na query: + +```sql +-- CSV com header (pipe para arquivo via curl) +COPY (SELECT * FROM br_ibge_censo2022.municipios LIMIT 1000) TO '/dev/stdout' (FORMAT csv, HEADER true); -- JSON @@ -136,7 +75,89 @@ SELECT * FROM br_ibge_censo2022.municipios LIMIT 10 FORMAT JSON; ``` -### Deploy +--- + +## Exploração local + +Para rodar as queries na sua própria máquina com DuckDB instalado: + +```bash +python prepara_db.py # gera basedosdados.duckdb com views apontando para o S3 +duckdb basedosdados.duckdb +``` + +Requer as credenciais do S3 no `.env` (veja seção de configuração abaixo). + +--- + +## Pipeline de exportação + +> Seção para mantenedores — não necessário para consulta dos dados. + +### Fluxo + +``` +BigQuery (basedosdados) → GCS (Parquet + zstd) → Hetzner Object Storage (rclone) +``` + +1. Descobre automaticamente todos os datasets e tabelas via API do BigQuery +2. Exporta em paralelo no formato Parquet com compressão zstd +3. Transfere GCS → Hetzner Object Storage via rclone (streaming direto, sem disco local) +4. Verifica contagem de arquivos entre GCS e S3 + +Resume automático: se interrompido, basta rodar novamente. + +### Scripts + +| Script | Função | +|---|---| +| `roda.sh` | Pipeline principal de exportação | +| `prepara_db.py` | Gera `basedosdados.duckdb` com views para todas as tabelas | + +### Configuração (`.env`) + +| Variável | Descrição | +|---|---| +| `YOUR_PROJECT` | ID do projeto GCP (para faturamento) | +| `BUCKET_NAME` | Nome do bucket GCS intermediário | +| `BUCKET_REGION` | Região do bucket S3 (ex: `eu-central`) | +| `SOURCE_PROJECT` | Projeto fonte (`basedosdados`) | +| `PARALLEL_EXPORTS` | Jobs paralelos de exportação BigQuery (padrão: 8) | +| `HETZNER_S3_BUCKET` | Nome do bucket no Hetzner Object Storage | +| `HETZNER_S3_ENDPOINT` | Endpoint do Hetzner (ex: `https://hel1.your-objectstorage.com`) | +| `S3_CONCURRENCY` | Transfers paralelos do rclone (padrão: 64) | +| `PARALLEL_UPLOADS` | Datasets enviados em paralelo (padrão: 4) | +| `AWS_ACCESS_KEY_ID` | Access key do Hetzner Object Storage | +| `AWS_SECRET_ACCESS_KEY` | Secret key do Hetzner Object Storage | +| `BASIC_AUTH_PASSWORD` | Senha do shell web e endpoint `/query` | + +### Executando + +```bash +chmod +x roda.sh +./roda.sh --dry-run # estima tamanho e custo +./roda.sh # execução local +./roda.sh --gcloud-run # cria VM no GCP, roda lá e deleta ao final +``` + +Autenticação GCP necessária antes da primeira exportação: + +```bash +gcloud auth login +gcloud auth application-default login +gcloud config set project SEU_PROJECT_ID +``` + +#### `--gcloud-run` + +Cria uma VM `e2-standard-4` Debian 12 em `us-central1-a`, copia o script e o `.env`, instala dependências e executa via SSH. + +| Variável | Padrão | Descrição | +|---|---|---| +| `GCP_VM_NAME` | `bd-export-vm` | Nome da instância | +| `GCP_VM_ZONE` | `us-central1-a` | Zona do Compute Engine | + +### Deploy do servidor ```bash haloy deploy diff --git a/gera_schemas.py b/gera_schemas.py deleted file mode 100644 index e4d963d..0000000 --- a/gera_schemas.py +++ /dev/null @@ -1,268 +0,0 @@ -import os -import json -import sys -import pyarrow.parquet as pq -import s3fs -import boto3 -import duckdb -from dotenv import load_dotenv - -load_dotenv() - -S3_ENDPOINT = os.environ["HETZNER_S3_ENDPOINT"] -S3_BUCKET = os.environ["HETZNER_S3_BUCKET"] -ACCESS_KEY = os.environ["AWS_ACCESS_KEY_ID"] -SECRET_KEY = os.environ["AWS_SECRET_ACCESS_KEY"] - -s3_host = S3_ENDPOINT.removeprefix("https://").removeprefix("http://") - -# --- boto3 client (listing only, zero egress) --- -boto = boto3.client( - "s3", - endpoint_url=S3_ENDPOINT, - aws_access_key_id=ACCESS_KEY, - aws_secret_access_key=SECRET_KEY, -) - -# --- s3fs filesystem (footer-only reads via pyarrow) --- -fs = s3fs.S3FileSystem( - client_kwargs={"endpoint_url": S3_ENDPOINT}, - key=ACCESS_KEY, - secret=SECRET_KEY, -) - -# ------------------------------------------------------------------ # -# Phase 1: File inventory via S3 List API (zero data egress) -# ------------------------------------------------------------------ # -print("Phase 1: listing S3 objects...") -paginator = boto.get_paginator("list_objects_v2") - -inventory = {} # "dataset/table" -> {files: [...], total_size: int} - -for page in paginator.paginate(Bucket=S3_BUCKET): - for obj in page.get("Contents", []): - key = obj["Key"] - if not key.endswith(".parquet"): - continue - parts = key.split("/") - if len(parts) < 3: - continue - dataset, table = parts[0], parts[1] - dt = f"{dataset}/{table}" - if dt not in inventory: - inventory[dt] = {"files": [], "total_size_bytes": 0} - inventory[dt]["files"].append(key) - inventory[dt]["total_size_bytes"] += obj["Size"] - -print(f" Found {len(inventory)} tables across {S3_BUCKET}") - -# ------------------------------------------------------------------ # -# Phase 2: Schema reads — footer only (~30 KB per table) -# ------------------------------------------------------------------ # -print("Phase 2: reading parquet footers...") - -def fmt_size(b): - for unit in ("B", "KB", "MB", "GB", "TB"): - if b < 1024 or unit == "TB": - return f"{b:.1f} {unit}" - b /= 1024 - -def extract_col_descriptions(schema): - """Try to pull per-column descriptions from Arrow metadata.""" - descriptions = {} - meta = schema.metadata or {} - # BigQuery exports embed a JSON blob under b'pandas' with column_info - pandas_meta_raw = meta.get(b"pandas") or meta.get(b"pandas_metadata") - if pandas_meta_raw: - try: - pm = json.loads(pandas_meta_raw) - for col in pm.get("columns", []): - name = col.get("name") - desc = col.get("metadata", {}) or {} - if isinstance(desc, dict) and "description" in desc: - descriptions[name] = desc["description"] - except Exception: - pass - # Also try top-level b'description' or b'schema' - for key in (b"description", b"schema", b"BigQuery:description"): - val = meta.get(key) - if val: - try: - descriptions["__table__"] = val.decode("utf-8", errors="replace") - except Exception: - pass - return descriptions - -schemas = {} -errors = [] - -for i, (dt, info) in enumerate(sorted(inventory.items())): - dataset, table = dt.split("/", 1) - first_file = info["files"][0] - s3_path = f"{S3_BUCKET}/{first_file}" - try: - schema = pq.read_schema(fs.open(s3_path)) - col_descs = extract_col_descriptions(schema) - - # Build raw metadata dict (decode bytes keys/values) - raw_meta = {} - if schema.metadata: - for k, v in schema.metadata.items(): - try: - dk = k.decode("utf-8", errors="replace") - dv = v.decode("utf-8", errors="replace") - # Try to parse JSON values - try: - dv = json.loads(dv) - except Exception: - pass - raw_meta[dk] = dv - except Exception: - pass - - columns = [] - for field in schema: - col = { - "name": field.name, - "type": str(field.type), - "nullable": field.nullable, - } - if field.name in col_descs: - col["description"] = col_descs[field.name] - # Check field-level metadata - if field.metadata: - for k, v in field.metadata.items(): - try: - dk = k.decode("utf-8", errors="replace") - dv = v.decode("utf-8", errors="replace") - if dk in ("description", "DESCRIPTION", "comment"): - col["description"] = dv - except Exception: - pass - columns.append(col) - - schemas[f"{dataset}.{table}"] = { - "path": f"s3://{S3_BUCKET}/{dataset}/{table}/", - "file_count": len(info["files"]), - "total_size_bytes": info["total_size_bytes"], - "total_size_human": fmt_size(info["total_size_bytes"]), - "columns": columns, - "metadata": raw_meta, - } - print(f" [{i+1}/{len(inventory)}] ✓ {dataset}.{table} ({len(columns)} cols, {fmt_size(info['total_size_bytes'])})") - except Exception as e: - errors.append({"table": f"{dataset}.{table}", "error": str(e)}) - print(f" [{i+1}/{len(inventory)}] ✗ {dataset}.{table}: {e}", file=sys.stderr) - -# ------------------------------------------------------------------ # -# Phase 3: Enrich from br_bd_metadados.bigquery_tables (small table) -# ------------------------------------------------------------------ # -META_TABLE = "br_bd_metadados.bigquery_tables" -meta_dt = "br_bd_metadados/bigquery_tables" - -if meta_dt in inventory: - print(f"Phase 3: enriching from {META_TABLE}...") - try: - con = duckdb.connect() - con.execute("INSTALL httpfs; LOAD httpfs;") - con.execute(f""" - SET s3_endpoint='{s3_host}'; - SET s3_access_key_id='{ACCESS_KEY}'; - SET s3_secret_access_key='{SECRET_KEY}'; - SET s3_url_style='path'; - """) - meta_path = f"s3://{S3_BUCKET}/br_bd_metadados/bigquery_tables/*.parquet" - # Peek at available columns - available = [r[0] for r in con.execute(f"DESCRIBE SELECT * FROM '{meta_path}' LIMIT 1").fetchall()] - print(f" Metadata columns: {available}") - - # Try to find dataset/table description columns - desc_col = next((c for c in available if "description" in c.lower()), None) - ds_col = next((c for c in available if c.lower() in ("dataset_id", "dataset", "schema_name")), None) - tbl_col = next((c for c in available if c.lower() in ("table_id", "table_name", "table")), None) - - if desc_col and ds_col and tbl_col: - rows = con.execute(f""" - SELECT {ds_col}, {tbl_col}, {desc_col} - FROM '{meta_path}' - """).fetchall() - for ds, tbl, desc in rows: - key = f"{ds}.{tbl}" - if key in schemas and desc: - schemas[key]["table_description"] = desc - print(f" Enriched {len(rows)} table descriptions") - else: - print(f" Could not find expected columns (dataset_id, table_id, description) — skipping enrichment") - con.close() - except Exception as e: - print(f" Enrichment failed: {e}", file=sys.stderr) -else: - print("Phase 3: br_bd_metadados.bigquery_tables not in S3 — skipping enrichment") - -# ------------------------------------------------------------------ # -# Phase 4a: Write schemas.json -# ------------------------------------------------------------------ # -print("Phase 4: writing outputs...") - -output = { - "_meta": { - "bucket": S3_BUCKET, - "total_tables": len(schemas), - "total_size_bytes": sum(v["total_size_bytes"] for v in schemas.values()), - "total_size_human": fmt_size(sum(v["total_size_bytes"] for v in schemas.values())), - "errors": errors, - }, - "tables": dict(sorted(schemas.items())), -} - -with open("schemas.json", "w", encoding="utf-8") as f: - json.dump(output, f, ensure_ascii=False, indent=2) - -print(f" ✓ schemas.json ({len(schemas)} tables)") - -# ------------------------------------------------------------------ # -# Phase 4b: Write file_tree.md -# ------------------------------------------------------------------ # -lines = [ - f"# S3 File Tree: {S3_BUCKET}", - "", -] - -# Group by dataset -datasets_map = {} -for dt_key, info in sorted(inventory.items()): - dataset, table = dt_key.split("/", 1) - datasets_map.setdefault(dataset, []).append((table, info)) - -total_files = sum(len(v["files"]) for v in inventory.values()) -total_bytes = sum(v["total_size_bytes"] for v in inventory.values()) - -for dataset, tables in sorted(datasets_map.items()): - ds_bytes = sum(i["total_size_bytes"] for _, i in tables) - ds_files = sum(len(i["files"]) for _, i in tables) - lines.append(f"## {dataset}/ ({len(tables)} tables, {fmt_size(ds_bytes)}, {ds_files} files)") - lines.append("") - for table, info in sorted(tables): - schema_entry = schemas.get(f"{dataset}.{table}", {}) - ncols = len(schema_entry.get("columns", [])) - col_str = f", {ncols} cols" if ncols else "" - table_desc = schema_entry.get("table_description", "") - desc_str = f" — {table_desc}" if table_desc else "" - lines.append(f" - **{table}/** ({len(info['files'])} files, {fmt_size(info['total_size_bytes'])}{col_str}){desc_str}") - lines.append("") - -lines += [ - "---", - f"**Total: {len(inventory)} tables · {fmt_size(total_bytes)} · {total_files} parquet files**", -] - -with open("file_tree.md", "w", encoding="utf-8") as f: - f.write("\n".join(lines) + "\n") - -print(f" ✓ file_tree.md ({len(inventory)} tables)") -print() -print("Done!") -print(f" schemas.json — full column-level schema dump") -print(f" file_tree.md — bucket tree with sizes") -if errors: - print(f" {len(errors)} tables failed (see schemas.json _meta.errors)") diff --git a/open_gui.sh b/open_gui.sh deleted file mode 100755 index 72d31c9..0000000 --- a/open_gui.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash -cd "$(dirname "$0")" -INIT=$(mktemp /tmp/duckdb_init_XXXX) -printf "LOAD httpfs;\nATTACH 'basedosdados.duckdb' AS bd (READ_ONLY);\n" > "$INIT" -duckdb --ui ui.duckdb -init "$INIT" -rm -f "$INIT"