schema: revert Phase 3 to S3 bigquery_tables enrichment
GraphQL approach had broken pagination (totalCount key missing, crashes silently). S3 approach at least completes cleanly even if the metadata table currently lacks a description column.
This commit is contained in:
115
gera_schemas.py
115
gera_schemas.py
@@ -7,8 +7,6 @@ import boto3
|
||||
import duckdb
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# TODO: export bigquery colum description as parquet footer metadata
|
||||
|
||||
load_dotenv()
|
||||
|
||||
S3_ENDPOINT = os.environ["HETZNER_S3_ENDPOINT"]
|
||||
@@ -157,84 +155,49 @@ for i, (dt, info) in enumerate(sorted(inventory.items())):
|
||||
print(f" [{i+1}/{len(inventory)}] ✗ {dataset}.{table}: {e}", file=sys.stderr)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Phase 3: Enrich table + column descriptions from BD GraphQL API
|
||||
# Phase 3: Enrich from br_bd_metadados.bigquery_tables (small table)
|
||||
# ------------------------------------------------------------------ #
|
||||
print("Phase 3: fetching descriptions from Base dos Dados GraphQL API...")
|
||||
try:
|
||||
from basedosdados.backend import Backend as _BDBackend
|
||||
META_TABLE = "br_bd_metadados.bigquery_tables"
|
||||
meta_dt = "br_bd_metadados/bigquery_tables"
|
||||
|
||||
_bd = _BDBackend()
|
||||
_GRAPHQL_QUERY = """
|
||||
query ($first: Int!, $offset: Int!) {
|
||||
allTable(first: $first, offset: $offset) {
|
||||
totalCount
|
||||
edges {
|
||||
node {
|
||||
slug
|
||||
dataset { slug }
|
||||
descriptionPt
|
||||
columns {
|
||||
edges {
|
||||
node {
|
||||
name
|
||||
descriptionPt
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
PAGE_SIZE = 100
|
||||
offset = 0
|
||||
col_descs: dict = {} # (dataset.table, col_name) -> description
|
||||
tbl_descs: dict = {} # dataset.table -> description
|
||||
total = None
|
||||
if meta_dt in inventory:
|
||||
print(f"Phase 3: enriching from {META_TABLE}...")
|
||||
try:
|
||||
con = duckdb.connect()
|
||||
con.execute("INSTALL httpfs; LOAD httpfs;")
|
||||
con.execute(f"""
|
||||
SET s3_endpoint='{s3_host}';
|
||||
SET s3_access_key_id='{ACCESS_KEY}';
|
||||
SET s3_secret_access_key='{SECRET_KEY}';
|
||||
SET s3_url_style='path';
|
||||
""")
|
||||
meta_path = f"s3://{S3_BUCKET}/br_bd_metadados/bigquery_tables/*.parquet"
|
||||
# Peek at available columns
|
||||
available = [r[0] for r in con.execute(f"DESCRIBE SELECT * FROM '{meta_path}' LIMIT 1").fetchall()]
|
||||
print(f" Metadata columns: {available}")
|
||||
|
||||
while True:
|
||||
result = _bd._execute_query(
|
||||
_GRAPHQL_QUERY, variables={"first": PAGE_SIZE, "offset": offset}
|
||||
)
|
||||
items = result["allTable"]["items"]
|
||||
if total is None:
|
||||
total = result["allTable"]["totalCount"]
|
||||
print(f" API reports {total} tables total")
|
||||
for tbl_node in items:
|
||||
ds_slug = (tbl_node.get("dataset") or {}).get("slug", "")
|
||||
tbl_slug = tbl_node.get("slug", "")
|
||||
key = f"{ds_slug}.{tbl_slug}"
|
||||
desc_pt = tbl_node.get("descriptionPt") or ""
|
||||
if desc_pt:
|
||||
tbl_descs[key] = desc_pt
|
||||
for col_node in (tbl_node.get("columns") or {}).get("items", []):
|
||||
col_name = col_node.get("name", "")
|
||||
col_desc = col_node.get("descriptionPt") or ""
|
||||
if col_name and col_desc:
|
||||
col_descs[(key, col_name)] = col_desc
|
||||
offset += PAGE_SIZE
|
||||
print(f" fetched {min(offset, total)}/{total} tables...", end="\r")
|
||||
if offset >= total:
|
||||
break
|
||||
# Try to find dataset/table description columns
|
||||
desc_col = next((c for c in available if "description" in c.lower()), None)
|
||||
ds_col = next((c for c in available if c.lower() in ("dataset_id", "dataset", "schema_name")), None)
|
||||
tbl_col = next((c for c in available if c.lower() in ("table_id", "table_name", "table")), None)
|
||||
|
||||
print() # newline after \r progress
|
||||
|
||||
enriched_tbls = 0
|
||||
enriched_cols = 0
|
||||
for tbl_key, tbl_info in schemas.items():
|
||||
if tbl_key in tbl_descs:
|
||||
tbl_info["table_description"] = tbl_descs[tbl_key]
|
||||
enriched_tbls += 1
|
||||
for col in tbl_info["columns"]:
|
||||
lookup = (tbl_key, col["name"])
|
||||
if lookup in col_descs and not col.get("description"):
|
||||
col["description"] = col_descs[lookup]
|
||||
enriched_cols += 1
|
||||
|
||||
print(f" Enriched {enriched_tbls} table descriptions, {enriched_cols} column descriptions")
|
||||
|
||||
except Exception as e:
|
||||
print(f" GraphQL enrichment failed: {e}", file=sys.stderr)
|
||||
if desc_col and ds_col and tbl_col:
|
||||
rows = con.execute(f"""
|
||||
SELECT {ds_col}, {tbl_col}, {desc_col}
|
||||
FROM '{meta_path}'
|
||||
""").fetchall()
|
||||
for ds, tbl, desc in rows:
|
||||
key = f"{ds}.{tbl}"
|
||||
if key in schemas and desc:
|
||||
schemas[key]["table_description"] = desc
|
||||
print(f" Enriched {len(rows)} table descriptions")
|
||||
else:
|
||||
print(f" Could not find expected columns (dataset_id, table_id, description) — skipping enrichment")
|
||||
con.close()
|
||||
except Exception as e:
|
||||
print(f" Enrichment failed: {e}", file=sys.stderr)
|
||||
else:
|
||||
print("Phase 3: br_bd_metadados.bigquery_tables not in S3 — skipping enrichment")
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Phase 4a: Write schemas.json
|
||||
|
||||
Reference in New Issue
Block a user