From c142080a5dac07761c616f614c3fa45412c57fd6 Mon Sep 17 00:00:00 2001 From: rafapolo Date: Sat, 28 Mar 2026 11:27:54 +0100 Subject: [PATCH] schema: revert Phase 3 to S3 bigquery_tables enrichment GraphQL approach had broken pagination (totalCount key missing, crashes silently). S3 approach at least completes cleanly even if the metadata table currently lacks a description column. --- gera_schemas.py | 115 ++++++++++++++++-------------------------------- 1 file changed, 39 insertions(+), 76 deletions(-) diff --git a/gera_schemas.py b/gera_schemas.py index 4e2f090..e4d963d 100644 --- a/gera_schemas.py +++ b/gera_schemas.py @@ -7,8 +7,6 @@ import boto3 import duckdb from dotenv import load_dotenv -# TODO: export bigquery colum description as parquet footer metadata - load_dotenv() S3_ENDPOINT = os.environ["HETZNER_S3_ENDPOINT"] @@ -157,84 +155,49 @@ for i, (dt, info) in enumerate(sorted(inventory.items())): print(f" [{i+1}/{len(inventory)}] ✗ {dataset}.{table}: {e}", file=sys.stderr) # ------------------------------------------------------------------ # -# Phase 3: Enrich table + column descriptions from BD GraphQL API +# Phase 3: Enrich from br_bd_metadados.bigquery_tables (small table) # ------------------------------------------------------------------ # -print("Phase 3: fetching descriptions from Base dos Dados GraphQL API...") -try: - from basedosdados.backend import Backend as _BDBackend +META_TABLE = "br_bd_metadados.bigquery_tables" +meta_dt = "br_bd_metadados/bigquery_tables" - _bd = _BDBackend() - _GRAPHQL_QUERY = """ - query ($first: Int!, $offset: Int!) { - allTable(first: $first, offset: $offset) { - totalCount - edges { - node { - slug - dataset { slug } - descriptionPt - columns { - edges { - node { - name - descriptionPt - } - } - } - } - } - } - } - """ - PAGE_SIZE = 100 - offset = 0 - col_descs: dict = {} # (dataset.table, col_name) -> description - tbl_descs: dict = {} # dataset.table -> description - total = None +if meta_dt in inventory: + print(f"Phase 3: enriching from {META_TABLE}...") + try: + con = duckdb.connect() + con.execute("INSTALL httpfs; LOAD httpfs;") + con.execute(f""" + SET s3_endpoint='{s3_host}'; + SET s3_access_key_id='{ACCESS_KEY}'; + SET s3_secret_access_key='{SECRET_KEY}'; + SET s3_url_style='path'; + """) + meta_path = f"s3://{S3_BUCKET}/br_bd_metadados/bigquery_tables/*.parquet" + # Peek at available columns + available = [r[0] for r in con.execute(f"DESCRIBE SELECT * FROM '{meta_path}' LIMIT 1").fetchall()] + print(f" Metadata columns: {available}") - while True: - result = _bd._execute_query( - _GRAPHQL_QUERY, variables={"first": PAGE_SIZE, "offset": offset} - ) - items = result["allTable"]["items"] - if total is None: - total = result["allTable"]["totalCount"] - print(f" API reports {total} tables total") - for tbl_node in items: - ds_slug = (tbl_node.get("dataset") or {}).get("slug", "") - tbl_slug = tbl_node.get("slug", "") - key = f"{ds_slug}.{tbl_slug}" - desc_pt = tbl_node.get("descriptionPt") or "" - if desc_pt: - tbl_descs[key] = desc_pt - for col_node in (tbl_node.get("columns") or {}).get("items", []): - col_name = col_node.get("name", "") - col_desc = col_node.get("descriptionPt") or "" - if col_name and col_desc: - col_descs[(key, col_name)] = col_desc - offset += PAGE_SIZE - print(f" fetched {min(offset, total)}/{total} tables...", end="\r") - if offset >= total: - break + # Try to find dataset/table description columns + desc_col = next((c for c in available if "description" in c.lower()), None) + ds_col = next((c for c in available if c.lower() in ("dataset_id", "dataset", "schema_name")), None) + tbl_col = next((c for c in available if c.lower() in ("table_id", "table_name", "table")), None) - print() # newline after \r progress - - enriched_tbls = 0 - enriched_cols = 0 - for tbl_key, tbl_info in schemas.items(): - if tbl_key in tbl_descs: - tbl_info["table_description"] = tbl_descs[tbl_key] - enriched_tbls += 1 - for col in tbl_info["columns"]: - lookup = (tbl_key, col["name"]) - if lookup in col_descs and not col.get("description"): - col["description"] = col_descs[lookup] - enriched_cols += 1 - - print(f" Enriched {enriched_tbls} table descriptions, {enriched_cols} column descriptions") - -except Exception as e: - print(f" GraphQL enrichment failed: {e}", file=sys.stderr) + if desc_col and ds_col and tbl_col: + rows = con.execute(f""" + SELECT {ds_col}, {tbl_col}, {desc_col} + FROM '{meta_path}' + """).fetchall() + for ds, tbl, desc in rows: + key = f"{ds}.{tbl}" + if key in schemas and desc: + schemas[key]["table_description"] = desc + print(f" Enriched {len(rows)} table descriptions") + else: + print(f" Could not find expected columns (dataset_id, table_id, description) — skipping enrichment") + con.close() + except Exception as e: + print(f" Enrichment failed: {e}", file=sys.stderr) +else: + print("Phase 3: br_bd_metadados.bigquery_tables not in S3 — skipping enrichment") # ------------------------------------------------------------------ # # Phase 4a: Write schemas.json