Files
texas-district-analysis/rebuild/well_shape.ipynb
2026-01-30 10:57:55 -08:00

1881 lines
82 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 4,
"id": "64ba36a8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"RealDictRow({'version': 'PostgreSQL 17.6 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 15.2.1 20250813, 64-bit'})\n"
]
}
],
"source": [
"# Cell 0: connect to Postgres\n",
"# Requires: pip install psycopg2-binary\n",
"import os\n",
"import psycopg2\n",
"from psycopg2.extras import RealDictCursor\n",
"\n",
"# Configure via environment variables (safer) or edit defaults below\n",
"PGHOST = os.getenv(\"PGHOST\", \"localhost\")\n",
"PGPORT = os.getenv(\"PGPORT\", \"5432\")\n",
"PGUSER = os.getenv(\"PGUSER\", \"postgres\")\n",
"PGPASSWORD = os.getenv(\"PGPASSWORD\", \"\")\n",
"# Default DB name set to \"postgres\"; will fall back if PGDATABASE not set in env\n",
"PGDATABASE = os.getenv(\"PGDATABASE\", \"texas_data\")\n",
"\n",
"if PGDATABASE is None or PGDATABASE == \"\":\n",
" print(\"[warn] PGDATABASE not set; using fallback 'postgres'\")\n",
" PGDATABASE = \"postgres\"\n",
"\n",
"def get_conn():\n",
" \"\"\"Return a new psycopg2 connection using configured environment variables.\"\"\"\n",
" return psycopg2.connect(\n",
" host=PGHOST,\n",
" port=PGPORT,\n",
" user=PGUSER,\n",
" password=PGPASSWORD,\n",
" dbname=PGDATABASE,\n",
" )\n",
"\n",
"# Quick connection test (only runs if executed as a script, not typical in notebook)\n",
"if __name__ == \"__main__\":\n",
" try:\n",
" conn = get_conn()\n",
" with conn.cursor(cursor_factory=RealDictCursor) as cur:\n",
" cur.execute(\"SELECT version() AS version;\")\n",
" print(cur.fetchone())\n",
" conn.close()\n",
" except Exception as e:\n",
" print(\"Postgres connection failed:\", e)\n",
" raise"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "43cbda02",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Note: you may need to restart the kernel to use updated packages.\n"
]
}
],
"source": [
"# Install geospatial dependencies\n",
"%pip install --quiet geopandas shapely fiona pyproj geoalchemy2 sqlalchemy"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "66396497",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Found 254 zip files\n",
"Unified attribute column count (sample-based): 16\n",
"Observed geometry types in sample: ['Point']\n",
"Unified attribute column count (sample-based): 16\n",
"Observed geometry types in sample: ['Point']\n",
"Final unified attribute column count: 16\n",
"Columns:\n",
"['bottom_id', 'surface_id', 'symnum', 'apinum', 'reliab', 'api10', 'api', 'long27', 'lat27', 'long83', 'lat83', 'out_fips', 'cwellnum', 'radioact', 'wellid', 'stcode']\n",
"Final unified attribute column count: 16\n",
"Columns:\n",
"['bottom_id', 'surface_id', 'symnum', 'apinum', 'reliab', 'api10', 'api', 'long27', 'lat27', 'long83', 'lat83', 'out_fips', 'cwellnum', 'radioact', 'wellid', 'stcode']\n"
]
}
],
"source": [
"# Cell 2: scan shapefile zips and build unified schema\n",
"import zipfile\n",
"from pathlib import Path\n",
"import geopandas as gpd\n",
"import pandas as pd\n",
"import re\n",
"\n",
"shp_zip_dir = Path('/home/dadams/data/rrc_shapefiles/shapefile_zips')\n",
"if not shp_zip_dir.exists():\n",
" raise FileNotFoundError(f\"Directory {shp_zip_dir} not found\")\n",
"\n",
"zip_files = sorted(shp_zip_dir.glob('*.zip'))\n",
"print(f\"Found {len(zip_files)} zip files\")\n",
"if not zip_files:\n",
" raise SystemExit(\"No shapefile zip archives found\")\n",
"\n",
"def sanitize(name: str) -> str:\n",
" s = name.strip().lower()\n",
" s = re.sub(r\"\\s+\", \"_\", s)\n",
" s = re.sub(r\"[^a-z0-9_]+\", \"_\", s)\n",
" s = re.sub(r\"_+\", \"_\", s).strip('_')\n",
" if not s or s[0].isdigit():\n",
" s = f\"col_{s}\" if s else \"col\"\n",
" return s\n",
"\n",
"all_columns = []\n",
"geom_types = set()\n",
"\n",
"sample_limit = 10 # read a few to get representative schema quickly\n",
"for i, zf_path in enumerate(zip_files[:sample_limit]):\n",
" with zipfile.ZipFile(zf_path) as zf:\n",
" # find .shp file inside zip\n",
" shp_members = [m for m in zf.namelist() if m.lower().endswith('.shp')]\n",
" if not shp_members:\n",
" print(f\"[warn] No .shp in {zf_path.name}, skipping\")\n",
" continue\n",
" # Extract to temp directory\n",
" import tempfile, os\n",
" with tempfile.TemporaryDirectory() as tmpdir:\n",
" zf.extractall(tmpdir)\n",
" shp_path = Path(tmpdir) / shp_members[0]\n",
" try:\n",
" gdf = gpd.read_file(shp_path)\n",
" except Exception as e:\n",
" print(f\"[warn] Failed to read {zf_path.name}: {e}\")\n",
" continue\n",
" # record columns\n",
" cols = [sanitize(c) for c in gdf.columns if c.lower() != 'geometry']\n",
" for c in cols:\n",
" if c not in all_columns:\n",
" all_columns.append(c)\n",
" # record geometry types\n",
" geom_types.update(gdf.geometry.geom_type.unique())\n",
"\n",
"print(f\"Unified attribute column count (sample-based): {len(all_columns)}\")\n",
"print(f\"Observed geometry types in sample: {sorted(geom_types)}\")\n",
"\n",
"# We'll finalize column list after scanning all archives for robustness\n",
"for zf_path in zip_files:\n",
" with zipfile.ZipFile(zf_path) as zf:\n",
" shp_members = [m for m in zf.namelist() if m.lower().endswith('.shp')]\n",
" if not shp_members:\n",
" continue\n",
" import tempfile\n",
" with tempfile.TemporaryDirectory() as tmpdir:\n",
" zf.extractall(tmpdir)\n",
" shp_path = Path(tmpdir) / shp_members[0]\n",
" try:\n",
" gdf = gpd.read_file(shp_path)\n",
" except Exception:\n",
" continue\n",
" cols = [sanitize(c) for c in gdf.columns if c.lower() != 'geometry']\n",
" for c in cols:\n",
" if c not in all_columns:\n",
" all_columns.append(c)\n",
"\n",
"print(f\"Final unified attribute column count: {len(all_columns)}\")\n",
"print(\"Columns:\")\n",
"print(all_columns)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "06ab9489",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Created table well_shapes\n",
"Inserted 1390194 rows into well_shapes\n",
"Inserted 1390194 rows into well_shapes\n",
"Added GIST index on well_shapes.geom\n",
"Added GIST index on well_shapes.geom\n"
]
}
],
"source": [
"# Cell 3: create PostGIS table well_shapes and ingest all shapefiles\n",
"from sqlalchemy import create_engine, text\n",
"from geoalchemy2 import Geometry\n",
"import zipfile\n",
"import tempfile\n",
"\n",
"# Build SQLAlchemy engine from PG* env\n",
"pg_url = f\"postgresql+psycopg2://{PGUSER}:{PGPASSWORD}@{PGHOST}:{PGPORT}/{PGDATABASE}\"\n",
"engine = create_engine(pg_url)\n",
"\n",
"# Ensure postgis is enabled\n",
"with engine.begin() as conn:\n",
" conn.execute(text(\"CREATE EXTENSION IF NOT EXISTS postgis\"))\n",
"\n",
"# Create table with unified text columns + geometry\n",
"attr_cols_sql = \", \".join(f'\"{c}\" TEXT' for c in all_columns)\n",
"create_sql = f\"\"\"\n",
"DROP TABLE IF EXISTS well_shapes;\n",
"CREATE TABLE well_shapes (\n",
" id BIGSERIAL PRIMARY KEY,\n",
" {attr_cols_sql},\n",
" geom geometry(GEOMETRY, 4326)\n",
");\n",
"\"\"\"\n",
"with engine.begin() as conn:\n",
" conn.execute(text(create_sql))\n",
"print(\"Created table well_shapes\")\n",
"\n",
"# Ingest all shapefiles: read, normalize columns, ensure WGS84 (EPSG:4326), and bulk insert via GeoPandas\n",
"zip_files = sorted(Path('/home/dadams/data/rrc_shapefiles/shapefile_zips').glob('*.zip'))\n",
"rows_total = 0\n",
"for zf_path in zip_files:\n",
" with zipfile.ZipFile(zf_path) as zf:\n",
" shp_members = [m for m in zf.namelist() if m.lower().endswith('.shp')]\n",
" if not shp_members:\n",
" print(f\"[warn] No .shp in {zf_path.name}, skipping\")\n",
" continue\n",
" with tempfile.TemporaryDirectory() as tmpdir:\n",
" zf.extractall(tmpdir)\n",
" shp_path = Path(tmpdir) / shp_members[0]\n",
" try:\n",
" gdf = gpd.read_file(shp_path)\n",
" except Exception as e:\n",
" print(f\"[warn] Failed to read {zf_path.name}: {e}\")\n",
" continue\n",
" if gdf.empty:\n",
" continue\n",
" # Normalize columns to match well_shapes schema\n",
" rename_map = {c: sanitize(c) for c in gdf.columns if c.lower() != 'geometry'}\n",
" gdf = gdf.rename(columns=rename_map)\n",
" # Keep only known columns; add missing as None\n",
" for c in all_columns:\n",
" if c not in gdf.columns:\n",
" gdf[c] = None\n",
" keep_cols = all_columns + ['geometry']\n",
" gdf = gdf[[c for c in keep_cols if c in gdf.columns]]\n",
" # Reproject to EPSG:4326\n",
" try:\n",
" if gdf.crs is None:\n",
" # Trust .prj if present; otherwise, assume EPSG:4326 as a fallback\n",
" gdf = gdf.set_crs(4326, allow_override=True)\n",
" if gdf.crs.to_epsg() != 4326:\n",
" gdf = gdf.to_crs(4326)\n",
" except Exception as e:\n",
" print(f\"[warn] CRS handling issue in {zf_path.name}: {e}; forcing 4326\")\n",
" gdf = gdf.set_crs(4326, allow_override=True)\n",
" # Geometry column must be named 'geom' to match table\n",
" gdf = gdf.rename_geometry('geom') if hasattr(gdf, 'rename_geometry') else gdf.set_geometry('geometry')\n",
" if gdf.geometry.name != 'geom':\n",
" gdf = gdf.set_geometry('geom')\n",
" # Cast attribute cols to string for safety\n",
" for c in all_columns:\n",
" if c in gdf.columns:\n",
" gdf[c] = gdf[c].astype(str).where(gdf[c].notna(), None)\n",
" # Write in chunks using GeoPandas to_postgis (handles WKB/WKT correctly)\n",
" chunksize = 5000\n",
" for start in range(0, len(gdf), chunksize):\n",
" chunk = gdf.iloc[start:start+chunksize].copy()\n",
" try:\n",
" chunk.to_postgis(\n",
" 'well_shapes',\n",
" engine,\n",
" if_exists='append',\n",
" index=False,\n",
" )\n",
" except Exception as e:\n",
" print(f\"[warn] to_postgis failed for {zf_path.name} chunk starting {start}: {e}\")\n",
" continue\n",
" rows_total += len(chunk)\n",
"print(f\"Inserted {rows_total} rows into well_shapes\")\n",
"\n",
"# Add spatial index\n",
"with engine.begin() as conn:\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_well_shapes_geom ON well_shapes USING GIST (geom)\"))\n",
"print(\"Added GIST index on well_shapes.geom\")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "eac28d18",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"well_shapes row count: 1390194\n",
"Geometry types:\n",
" ST_Point: 1390194\n",
" id bottom_id surface_id symnum apinum reliab api10 api \\\n",
"0 1 1314180 1314180 3 4200132761 55 00132761 00132761 \n",
"1 2 441598 441598 7 4200101708 30 00101708 00101708 \n",
"2 3 622581 622581 7 4200100126 15 00100126 00100126 \n",
"3 4 443035 443035 7 4200131523 15 00131523 00131523 \n",
"4 5 1060307 1060307 22 4200132335 40 00132335 00132335 \n",
"\n",
" long27 lat27 long83 lat83 out_fips cwellnum \\\n",
"0 -95.77868611 31.98602778 -95.77893973 31.98619311 N 1 \n",
"1 -95.50418172 31.63615535 -95.50442643 31.63633064 N 1 \n",
"2 -95.52108432 32.06352291 -95.52133065 32.06368669 N 661 \n",
"3 -95.62782436 31.94839591 -95.62807349 31.94856254 N 1 \n",
"4 -95.99003131 31.96996832 -95.99028153 31.97013568 N 12 \n",
"\n",
" radioact wellid stcode geom \n",
"0 None 32761 None 0101000020E61000008DAC0326DAF157C07956CF2677FC... \n",
"1 None 01708 None 0101000020E6100000879ACC8548E057C009AB9490E6A2... \n",
"2 None 00126 None 0101000020E6100000BECB397B5DE157C0A07CADE22608... \n",
"3 None 31523 None 0101000020E6100000B53A295B32E857C0D591A2FED4F2... \n",
"4 None 32335 None 0101000020E61000001B9D99ED60FF57C084A4DFAD5AF8... \n"
]
}
],
"source": [
"# Cell 4: verification queries\n",
"import pandas as pd\n",
"from sqlalchemy import text\n",
"\n",
"with engine.begin() as conn:\n",
" total = conn.execute(text(\"SELECT COUNT(*) FROM well_shapes\")).scalar()\n",
" types = conn.execute(text(\"SELECT ST_GeometryType(geom), COUNT(*) FROM well_shapes WHERE geom IS NOT NULL GROUP BY 1 ORDER BY 2 DESC\")).fetchall()\n",
"print(f\"well_shapes row count: {total}\")\n",
"print(\"Geometry types:\")\n",
"for t, c in types:\n",
" print(f\" {t}: {c}\")\n",
"\n",
"# Peek a few rows\n",
"with engine.begin() as conn:\n",
" df = pd.read_sql(text(\"SELECT * FROM well_shapes LIMIT 5\"), conn)\n",
"print(df.head())"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "285e1530",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Latitude/Longitude columns ensured and populated (where empty).\n"
]
}
],
"source": [
"# Cell 5: ensure latitude/longitude numeric columns from geometry (no API normalization here)\n",
"from sqlalchemy import text\n",
"\n",
"with engine.begin() as conn:\n",
" # Add latitude/longitude columns (double precision) if not exist\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS latitude DOUBLE PRECISION\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS longitude DOUBLE PRECISION\"))\n",
" # Populate lat/long from geometry (assuming Point geometries EPSG:4326)\n",
" conn.execute(text(\"\"\"\n",
" UPDATE well_shapes\n",
" SET longitude = ST_X(geom),\n",
" latitude = ST_Y(geom)\n",
" WHERE geom IS NOT NULL\n",
" AND (longitude IS NULL OR latitude IS NULL)\n",
" \"\"\"))\n",
" # Indexes for latitude/longitude (optional for range queries)\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_well_shapes_latitude ON well_shapes (latitude)\"))\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_well_shapes_longitude ON well_shapes (longitude)\"))\n",
"\n",
"print(\"Latitude/Longitude columns ensured and populated (where empty).\")"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "fbe0947c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'has_api': True, 'has_api10': True, 'has_apinum': True}\n",
"API Derivation Preview Stats:\n",
"{'ten_digit': 852653, 'eight_digit': 852653, 'api_anomaly': 363149, 'api10_anomaly': 1390193, 'apinum_anomaly': 537542}\n",
"Clean samples (first 5):\n",
" api api10 apinum api_digits_raw api10_digits_raw \\\n",
"0 05700422 05700422 4205700422 05700422 05700422 \n",
"1 05700427 05700427 4205700427 05700427 05700427 \n",
"2 05730811 05730811 4205730811 05730811 05730811 \n",
"3 05731052 05731052 4205731052 05731052 05731052 \n",
"4 05730924 05730924 4205730924 05730924 05730924 \n",
"\n",
" apinum_digits_raw api10_clean api8_clean \n",
"0 4205700422 4205700422 05700422 \n",
"1 4205700427 4205700427 05700427 \n",
"2 4205730811 4205730811 05730811 \n",
"3 4205731052 4205731052 05731052 \n",
"4 4205730924 4205730924 05730924 \n",
"Anomalous samples (first 5):\n",
" api api10 apinum api_digits_raw api10_digits_raw apinum_digits_raw\n",
"0 249 249 42249 249 249 42249\n",
"1 371 371 42371 371 371 42371\n",
"2 371 371 42371 371 371 42371\n",
"3 249 249 42249 249 249 42249\n",
"4 249 249 42249 249 249 42249\n",
"API Derivation Preview Stats:\n",
"{'ten_digit': 852653, 'eight_digit': 852653, 'api_anomaly': 363149, 'api10_anomaly': 1390193, 'apinum_anomaly': 537542}\n",
"Clean samples (first 5):\n",
" api api10 apinum api_digits_raw api10_digits_raw \\\n",
"0 05700422 05700422 4205700422 05700422 05700422 \n",
"1 05700427 05700427 4205700427 05700427 05700427 \n",
"2 05730811 05730811 4205730811 05730811 05730811 \n",
"3 05731052 05731052 4205731052 05731052 05731052 \n",
"4 05730924 05730924 4205730924 05730924 05730924 \n",
"\n",
" apinum_digits_raw api10_clean api8_clean \n",
"0 4205700422 4205700422 05700422 \n",
"1 4205700427 4205700427 05700427 \n",
"2 4205730811 4205730811 05730811 \n",
"3 4205731052 4205731052 05731052 \n",
"4 4205730924 4205730924 05730924 \n",
"Anomalous samples (first 5):\n",
" api api10 apinum api_digits_raw api10_digits_raw apinum_digits_raw\n",
"0 249 249 42249 249 249 42249\n",
"1 371 371 42371 371 371 42371\n",
"2 371 371 42371 371 371 42371\n",
"3 249 249 42249 249 249 42249\n",
"4 249 249 42249 249 249 42249\n"
]
}
],
"source": [
"# Cell 6: API preview (derive from apinum/api10/api; read-only audit)\n",
"from sqlalchemy import text\n",
"import pandas as pd\n",
"\n",
"# Determine which source columns exist in well_shapes\n",
"with engine.begin() as conn:\n",
" cols = [r[0] for r in conn.execute(text(\n",
" \"\"\"\n",
" SELECT column_name\n",
" FROM information_schema.columns\n",
" WHERE table_schema='public' AND table_name='well_shapes'\n",
" \"\"\"\n",
" )).fetchall()]\n",
"\n",
"has_api = 'api' in cols\n",
"has_api10 = 'api10' in cols\n",
"has_apinum = 'apinum' in cols\n",
"print({'has_api': has_api, 'has_api10': has_api10, 'has_apinum': has_apinum})\n",
"\n",
"with engine.begin() as conn:\n",
" # Working columns\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS api_digits_raw TEXT\"))\n",
" if has_api10:\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS api10_digits_raw TEXT\"))\n",
" if has_apinum:\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS apinum_digits_raw TEXT\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS api10_clean TEXT\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS api8_clean TEXT\"))\n",
"\n",
" # Populate digits-only raw columns\n",
" if has_api:\n",
" conn.execute(text(\n",
" \"\"\"\n",
" UPDATE well_shapes\n",
" SET api_digits_raw = regexp_replace(api, '[^0-9]', '', 'g')\n",
" WHERE api IS NOT NULL AND api <> '' AND api_digits_raw IS NULL\n",
" \"\"\"\n",
" ))\n",
" if has_api10:\n",
" conn.execute(text(\n",
" \"\"\"\n",
" UPDATE well_shapes\n",
" SET api10_digits_raw = regexp_replace(api10, '[^0-9]', '', 'g')\n",
" WHERE api10 IS NOT NULL AND api10 <> '' AND api10_digits_raw IS NULL\n",
" \"\"\"\n",
" ))\n",
" if has_apinum:\n",
" conn.execute(text(\n",
" \"\"\"\n",
" UPDATE well_shapes\n",
" SET apinum_digits_raw = regexp_replace(apinum, '[^0-9]', '', 'g')\n",
" WHERE apinum IS NOT NULL AND apinum <> '' AND apinum_digits_raw IS NULL\n",
" \"\"\"\n",
" ))\n",
"\n",
" # Build COALESCE expression for 10-digit API using available sources (precedence: api10, apinum, api)\n",
" parts = []\n",
" if has_api10:\n",
" parts.append(\"CASE WHEN length(api10_digits_raw)=10 THEN api10_digits_raw END\")\n",
" if has_apinum:\n",
" parts.append(\"CASE WHEN length(apinum_digits_raw)=10 THEN apinum_digits_raw END\")\n",
" if has_api:\n",
" parts.append(\"CASE WHEN length(api_digits_raw)=10 THEN api_digits_raw END\")\n",
" if not parts:\n",
" parts.append(\"NULL\")\n",
" expr = \"COALESCE(\" + \", \".join(parts) + \")\"\n",
"\n",
" # Compute api10_clean and api8_clean (for TX, api8 = county(3)+unique(5) from 10-digit)\n",
" conn.execute(text(f\"\"\"\n",
" UPDATE well_shapes\n",
" SET api10_clean = {expr}\n",
" WHERE api10_clean IS NULL\n",
" \"\"\"))\n",
" conn.execute(text(\n",
" \"\"\"\n",
" UPDATE well_shapes\n",
" SET api8_clean = CASE WHEN api10_clean IS NOT NULL AND length(api10_clean)=10\n",
" THEN substr(api10_clean, 3, 8)\n",
" ELSE NULL END\n",
" WHERE api8_clean IS NULL\n",
" \"\"\"\n",
" ))\n",
"\n",
"# Preview stats\n",
"with engine.begin() as conn:\n",
" stats = conn.execute(text(\n",
" \"\"\"\n",
" SELECT\n",
" COUNT(*) FILTER (WHERE api10_clean IS NOT NULL) AS ten_digit,\n",
" COUNT(*) FILTER (WHERE api8_clean IS NOT NULL) AS eight_digit,\n",
" COUNT(*) FILTER (WHERE api_digits_raw IS NOT NULL AND length(api_digits_raw) NOT IN (8,10)) AS api_anomaly,\n",
" {api10_anom} AS api10_anomaly,\n",
" {apinum_anom} AS apinum_anomaly\n",
" FROM well_shapes\n",
" \"\"\"\n",
" .format(\n",
" api10_anom=(\"COUNT(*) FILTER (WHERE api10_digits_raw IS NOT NULL AND length(api10_digits_raw)<>10)\"\n",
" if has_api10 else \"0\"),\n",
" apinum_anom=(\"COUNT(*) FILTER (WHERE apinum_digits_raw IS NOT NULL AND length(apinum_digits_raw)<>10)\"\n",
" if has_apinum else \"0\")\n",
" )\n",
" )).fetchone()\n",
"\n",
"print(\"API Derivation Preview Stats:\")\n",
"print({\n",
" 'ten_digit': stats[0],\n",
" 'eight_digit': stats[1],\n",
" 'api_anomaly': stats[2],\n",
" 'api10_anomaly': stats[3],\n",
" 'apinum_anomaly': stats[4]\n",
"})\n",
"\n",
"# Show samples\n",
"with engine.begin() as conn:\n",
" sample = pd.read_sql(text(\n",
" \"\"\"\n",
" SELECT api, {api10}, {apinum}, api_digits_raw, {api10_raw}, {apinum_raw}, api10_clean, api8_clean\n",
" FROM well_shapes\n",
" WHERE api8_clean IS NOT NULL\n",
" LIMIT 5\n",
" \"\"\"\n",
" .format(\n",
" api10=('api10' if has_api10 else \"NULL AS api10\"),\n",
" apinum=('apinum' if has_apinum else \"NULL AS apinum\"),\n",
" api10_raw=('api10_digits_raw' if has_api10 else \"NULL AS api10_digits_raw\"),\n",
" apinum_raw=('apinum_digits_raw' if has_apinum else \"NULL AS apinum_digits_raw\")\n",
" )\n",
" ), conn)\n",
"print(\"Clean samples (first 5):\")\n",
"print(sample)\n",
"\n",
"with engine.begin() as conn:\n",
" anomalous = pd.read_sql(text(\n",
" \"\"\"\n",
" SELECT api, {api10}, {apinum}, api_digits_raw, {api10_raw}, {apinum_raw}\n",
" FROM well_shapes\n",
" WHERE (api_digits_raw IS NOT NULL AND length(api_digits_raw) NOT IN (8,10))\n",
" OR ({api10_raw_is} AND length(api10_digits_raw)<>10)\n",
" OR ({apinum_raw_is} AND length(apinum_digits_raw)<>10)\n",
" LIMIT 5\n",
" \"\"\"\n",
" .format(\n",
" api10=('api10' if has_api10 else \"NULL AS api10\"),\n",
" apinum=('apinum' if has_apinum else \"NULL AS apinum\"),\n",
" api10_raw=('api10_digits_raw' if has_api10 else \"NULL AS api10_digits_raw\"),\n",
" apinum_raw=('apinum_digits_raw' if has_apinum else \"NULL AS apinum_digits_raw\"),\n",
" api10_raw_is=('TRUE' if has_api10 else 'FALSE'),\n",
" apinum_raw_is=('TRUE' if has_apinum else 'FALSE')\n",
" )\n",
" ), conn)\n",
"print(\"Anomalous samples (first 5):\")\n",
"print(anomalous)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "363a6606",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'api8_total': 1027045, 'api8_distinct': 1010430, 'api8_duplicate_groups': 11463, 'api10_total': 852653, 'api10_distinct': 852539, 'api10_duplicate_groups': 58}\n",
"Skipped UNIQUE on api_number (duplicates or empty)\n",
"Skipped UNIQUE on api10_number (duplicates or empty)\n"
]
}
],
"source": [
"# Cell 7: Apply API numbers (10-digit and 8-digit) with precedence apinum/api10/api\n",
"from sqlalchemy import text\n",
"\n",
"with engine.begin() as conn:\n",
" # Ensure destination columns\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS api_number TEXT\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS api10_number TEXT\"))\n",
"\n",
" # Assign api10_number from api10_clean when length=10\n",
" conn.execute(text(\n",
" \"\"\"\n",
" UPDATE well_shapes\n",
" SET api10_number = api10_clean\n",
" WHERE api10_clean IS NOT NULL AND length(api10_clean)=10\n",
" \"\"\"\n",
" ))\n",
"\n",
" # Assign api_number as the 8-digit from api8_clean\n",
" conn.execute(text(\n",
" \"\"\"\n",
" UPDATE well_shapes\n",
" SET api_number = api8_clean\n",
" WHERE api8_clean IS NOT NULL AND length(api8_clean)=8\n",
" \"\"\"\n",
" ))\n",
"\n",
" # Defensive cleanup: null anything not exactly 8/10\n",
" conn.execute(text(\"\"\"\n",
" UPDATE well_shapes SET api_number = NULL WHERE api_number IS NOT NULL AND length(api_number)<>8;\n",
" \"\"\"))\n",
" conn.execute(text(\"\"\"\n",
" UPDATE well_shapes SET api10_number = NULL WHERE api10_number IS NOT NULL AND length(api10_number)<>10;\n",
" \"\"\"))\n",
"\n",
" # Duplicate detection for api_number (8-digit)\n",
" dup8 = conn.execute(text(\n",
" \"\"\"\n",
" SELECT COUNT(*) FROM (\n",
" SELECT api_number, COUNT(*) c\n",
" FROM well_shapes\n",
" WHERE api_number IS NOT NULL\n",
" GROUP BY api_number\n",
" HAVING COUNT(*)>1\n",
" ) d\n",
" \"\"\"\n",
" )).scalar()\n",
" total8 = conn.execute(text(\"SELECT COUNT(*) FROM well_shapes WHERE api_number IS NOT NULL\")).scalar()\n",
" distinct8 = conn.execute(text(\"SELECT COUNT(DISTINCT api_number) FROM well_shapes WHERE api_number IS NOT NULL\")).scalar()\n",
"\n",
" # Duplicate detection for api10_number (10-digit)\n",
" dup10 = conn.execute(text(\n",
" \"\"\"\n",
" SELECT COUNT(*) FROM (\n",
" SELECT api10_number, COUNT(*) c\n",
" FROM well_shapes\n",
" WHERE api10_number IS NOT NULL\n",
" GROUP BY api10_number\n",
" HAVING COUNT(*)>1\n",
" ) d\n",
" \"\"\"\n",
" )).scalar()\n",
" total10 = conn.execute(text(\"SELECT COUNT(*) FROM well_shapes WHERE api10_number IS NOT NULL\")).scalar()\n",
" distinct10 = conn.execute(text(\"SELECT COUNT(DISTINCT api10_number) FROM well_shapes WHERE api10_number IS NOT NULL\")).scalar()\n",
"\n",
"print({\n",
" 'api8_total': total8,\n",
" 'api8_distinct': distinct8,\n",
" 'api8_duplicate_groups': dup8,\n",
" 'api10_total': total10,\n",
" 'api10_distinct': distinct10,\n",
" 'api10_duplicate_groups': dup10,\n",
"})\n",
"\n",
"# Optional: enforce unique constraints if no duplicates\n",
"if dup8 == 0 and total8 > 0:\n",
" with engine.begin() as conn:\n",
" conn.execute(text(\"ALTER TABLE well_shapes DROP CONSTRAINT IF EXISTS well_shapes_api_number_unique\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD CONSTRAINT well_shapes_api_number_unique UNIQUE (api_number)\"))\n",
" print(\"Applied UNIQUE constraint on api_number\")\n",
"else:\n",
" print(\"Skipped UNIQUE on api_number (duplicates or empty)\")\n",
"\n",
"if dup10 == 0 and total10 > 0:\n",
" with engine.begin() as conn:\n",
" conn.execute(text(\"ALTER TABLE well_shapes DROP CONSTRAINT IF EXISTS well_shapes_api10_number_unique\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD CONSTRAINT well_shapes_api10_number_unique UNIQUE (api10_number)\"))\n",
" print(\"Applied UNIQUE constraint on api10_number\")\n",
"else:\n",
" print(\"Skipped UNIQUE on api10_number (duplicates or empty)\")"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "d0c3734b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Using tract layer: ACS_2021_5YR_TRACT_48_TEXAS\n",
"Using GEOID column: GEOID\n",
"Loaded 6896 tract geometries into census_tracts_2021\n",
"Spatial index on tract geometries ensured.\n",
"Loaded 6896 tract geometries into census_tracts_2021\n",
"Spatial index on tract geometries ensured.\n"
]
}
],
"source": [
"# Cell 8: Load Texas Census tract geometries from local geodatabase into PostGIS (robust)\n",
"from sqlalchemy import text\n",
"import geopandas as gpd\n",
"import fiona\n",
"from pathlib import Path\n",
"\n",
"# Resolve geodatabase path\n",
"gdb_path = Path('data/ACS_2021_5YR_TRACT_48_TEXAS.gdb')\n",
"if not gdb_path.exists():\n",
" alt = Path('/home/dadams/Repos/texas-rebuild-postgis/data/ACS_2021_5YR_TRACT_48_TEXAS.gdb')\n",
" if alt.exists():\n",
" gdb_path = alt\n",
" else:\n",
" raise FileNotFoundError(f\"Geodatabase not found: {gdb_path}\")\n",
"\n",
"# Discover candidate layers with geometry\n",
"layers = fiona.listlayers(gdb_path)\n",
"# Prefer layers that look like tract geography and have polygon geometries\n",
"candidate_layers = [l for l in layers if 'TRACT' in l.upper() and 'BLOCK' not in l.upper()]\n",
"if not candidate_layers:\n",
" candidate_layers = layers # fallback: try any layer\n",
"\n",
"tract_gdf = None\n",
"selected_layer = None\n",
"for lyr in candidate_layers:\n",
" try:\n",
" gdf_try = gpd.read_file(gdb_path, layer=lyr)\n",
" # Ensure we actually have a GeoDataFrame with a geometry column\n",
" if hasattr(gdf_try, 'geometry') and gdf_try.geometry is not None:\n",
" if not gdf_try.empty:\n",
" tract_gdf = gdf_try\n",
" selected_layer = lyr\n",
" break\n",
" except Exception:\n",
" continue\n",
"\n",
"if tract_gdf is None:\n",
" raise SystemExit(f\"Failed to read a tract layer with geometry from: {layers}\")\n",
"\n",
"print(f\"Using tract layer: {selected_layer}\")\n",
"\n",
"# Ensure CRS is WGS84\n",
"try:\n",
" if tract_gdf.crs is None:\n",
" print('[warn] Tract CRS missing; assuming EPSG:4326')\n",
" tract_gdf = tract_gdf.set_crs(4326, allow_override=True)\n",
" if tract_gdf.crs.to_epsg() != 4326:\n",
" tract_gdf = tract_gdf.to_crs(4326)\n",
"except Exception as e:\n",
" print(f\"[warn] Issue normalizing CRS for tracts: {e}; forcing 4326\")\n",
" tract_gdf = tract_gdf.set_crs(4326, allow_override=True)\n",
"\n",
"# Identify GEOID column\n",
"geoid_col = None\n",
"for cand in ['GEOID', 'GEOID10', 'GEOID20', 'geoid', 'GEOID_TRACT', 'TRACTCE']:\n",
" if cand in tract_gdf.columns:\n",
" geoid_col = cand\n",
" break\n",
"if geoid_col is None:\n",
" geo_candidates = [c for c in tract_gdf.columns if 'GEOID' in c.upper()]\n",
" if geo_candidates:\n",
" geoid_col = geo_candidates[0]\n",
"if geoid_col is None:\n",
" raise SystemExit(f\"No GEOID-like column found in tract layer columns: {list(tract_gdf.columns)}\")\n",
"print(f\"Using GEOID column: {geoid_col}\")\n",
"\n",
"# Keep only needed columns and standardize names\n",
"tract_gdf = tract_gdf[[geoid_col, tract_gdf.geometry.name]].rename(columns={geoid_col: 'geoid'})\n",
"# Ensure geometry column name 'geom' to match our SQL later\n",
"if tract_gdf.geometry.name != 'geom' and hasattr(tract_gdf, 'rename_geometry'):\n",
" tract_gdf = tract_gdf.rename_geometry('geom')\n",
"if tract_gdf.geometry.name != 'geom':\n",
" tract_gdf = tract_gdf.set_geometry('geom')\n",
"\n",
"from sqlalchemy import create_engine\n",
"pg_url = f\"postgresql+psycopg2://{PGUSER}:{PGPASSWORD}@{PGHOST}:{PGPORT}/{PGDATABASE}\"\n",
"engine = create_engine(pg_url)\n",
"\n",
"# Load into PostGIS (replace if exists)\n",
"tract_table = 'census_tracts_2021'\n",
"tract_gdf.to_postgis(tract_table, engine, if_exists='replace', index=False)\n",
"print(f\"Loaded {len(tract_gdf)} tract geometries into {tract_table}\")\n",
"\n",
"# Add spatial index on the geom column\n",
"with engine.begin() as conn:\n",
" conn.execute(text(f\"CREATE INDEX IF NOT EXISTS idx_{tract_table}_geom ON {tract_table} USING GIST (geom)\"))\n",
"print(\"Spatial index on tract geometries ensured.\")"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "342491aa",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Assigned census_tract_geoid via spatial join for 1388601 wells\n",
"Ensured index on census_tract_geoid\n",
"Ensured index on census_tract_geoid\n"
]
}
],
"source": [
"# Cell 9: Spatially join wells to Census tracts in PostGIS (preferred, offline)\n",
"from sqlalchemy import text\n",
"\n",
"with engine.begin() as conn:\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS census_tract_geoid TEXT\"))\n",
" # Use ST_Contains (tract polygon contains point). Requires both in EPSG:4326 (already enforced earlier).\n",
" updated = conn.execute(text(\"\"\"\n",
" WITH to_update AS (\n",
" SELECT w.id, t.geoid\n",
" FROM well_shapes w\n",
" JOIN census_tracts_2021 t\n",
" ON ST_Contains(t.geom, w.geom)\n",
" WHERE w.geom IS NOT NULL\n",
" AND w.census_tract_geoid IS NULL\n",
" )\n",
" UPDATE well_shapes w\n",
" SET census_tract_geoid = u.geoid\n",
" FROM to_update u\n",
" WHERE w.id = u.id\n",
" RETURNING 1\n",
" \"\"\")).rowcount\n",
"print(f\"Assigned census_tract_geoid via spatial join for {updated or 0} wells\")\n",
"\n",
"# Index for faster joins/filters on tract\n",
"with engine.begin() as conn:\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_well_shapes_census_tract_geoid ON well_shapes (census_tract_geoid)\"))\n",
"print(\"Ensured index on census_tract_geoid\")"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "583d5852",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Census API key detected; attempting fallback for unresolved wells.\n",
"Fallback batch size: 200\n",
"API obtained GEOIDs for 0 wells\n",
"API obtained GEOIDs for 0 wells\n"
]
}
],
"source": [
"# Cell 10: OPTIONAL fallback using Census Geocoder API for missing tracts (requires CENSUS_API_KEY)\n",
"import os, time, requests, pandas as pd\n",
"from sqlalchemy import text\n",
"\n",
"API_KEY = os.getenv('CENSUS_API_KEY')\n",
"if API_KEY is None:\n",
" print('[info] CENSUS_API_KEY not set; skipping API fallback.')\n",
"else:\n",
" print('Census API key detected; attempting fallback for unresolved wells.')\n",
" # Fetch a batch of wells still lacking census_tract_geoid (limit to avoid quota issues)\n",
" batch_limit = 200 # adjust as needed; respect API rate limits\n",
" with engine.begin() as conn:\n",
" rows = conn.execute(text(\"\"\"\n",
" SELECT id, latitude, longitude\n",
" FROM well_shapes\n",
" WHERE census_tract_geoid IS NULL\n",
" AND latitude IS NOT NULL AND longitude IS NOT NULL\n",
" LIMIT :lim\n",
" \"\"\"), {'lim': batch_limit}).fetchall()\n",
" print(f\"Fallback batch size: {len(rows)}\")\n",
" if rows:\n",
" updates = []\n",
" for r in rows:\n",
" wid, lat, lon = r\n",
" # Census Geocoder API: https://geocoding.geo.census.gov/ (Federal API, not all endpoints require key; some do for bulk)\n",
" # Using /geocoder/geographies/coordinates endpoint\n",
" url = ('https://geocoding.geo.census.gov/geocoder/geographies/coordinates'\n",
" f'?x={lon}&y={lat}&benchmark=Public_AR_Current&vintage=ACS2021_Current&format=json&layers=Tracts&key={API_KEY}')\n",
" try:\n",
" resp = requests.get(url, timeout=5)\n",
" if resp.status_code == 200:\n",
" data = resp.json()\n",
" tract_info = data.get('result', {}).get('geographies', {}).get('Census Tracts', [])\n",
" if tract_info:\n",
" geoid = tract_info[0].get('GEOID')\n",
" if geoid:\n",
" updates.append((wid, geoid))\n",
" else:\n",
" print(f\"[warn] Non-200 response for well id {wid}: {resp.status_code}\")\n",
" except Exception as e:\n",
" print(f\"[warn] Request failed for well id {wid}: {e}\")\n",
" time.sleep(0.15) # gentle rate limiting\n",
" print(f\"API obtained GEOIDs for {len(updates)} wells\")\n",
" if updates:\n",
" # Bulk update\n",
" with engine.begin() as conn:\n",
" for wid, geoid in updates:\n",
" conn.execute(text(\"UPDATE well_shapes SET census_tract_geoid=:g WHERE id=:i\"), {'g': geoid, 'i': wid})\n",
" print(\"Applied API fallback updates.\")\n",
" else:\n",
" print('No wells need fallback.')"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "72084705",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Length distribution for api (length -> count):\n",
" length count\n",
"0 8 1027045\n",
"1 3 363147\n",
"2 7 2\n",
"\n",
"Length distribution for api10 (length -> count):\n",
" length count\n",
"0 8 845289\n",
"1 3 362399\n",
"2 10 182502\n",
"3 9 2\n",
"4 7 2\n",
"\n",
"Length distribution for apinum (length -> count):\n",
" length count\n",
"0 10 845288\n",
"1 5 362399\n",
"2 12 182503\n",
"3 9 2\n",
"4 11 2\n",
"Focused length counts:\n",
"{'apinum_len10': 845288, 'api10_len10': 182502, 'api_len10': 0, 'api_len3': 363147, 'api10_len3': 362399, 'apinum_len5': 362399}\n",
"Short API samples (showing related fields):\n",
" id api api10 apinum api_digits_raw api10_digits_raw apinum_digits_raw\n",
"0 643559 249 249 42249 249 249 42249\n",
"1 932957 371 371 42371 371 371 42371\n",
"2 933051 371 371 42371 371 371 42371\n",
"3 643600 249 249 42249 249 249 42249\n",
"4 643603 249 249 42249 249 249 42249\n",
"5 643605 249 249 42249 249 249 42249\n",
"6 643638 249 249 42249 249 249 42249\n",
"7 933718 371 371 42371 371 371 42371\n",
"8 933745 371 371 42371 371 371 42371\n",
"9 933963 371 371 42371 371 371 42371\n",
"Derived vs source (first 10):\n",
" id apinum api10 api api10_clean api8_clean \\\n",
"0 163778 4205730992D1 05730992D1 05730992 None None \n",
"1 164009 4205731591D1 05731591D1 05731591 None None \n",
"2 163965 4270330280D1 70330280D1 70330280 None None \n",
"3 164598 4270330331D1 70330331D1 70330331 None None \n",
"4 291223 4210933080H1 10933080H1 10933080 None None \n",
"5 292357 4210933084H1 10933084H1 10933084 None None \n",
"6 292494 4210933532H1 10933532H1 10933532 None None \n",
"7 292570 4210933079H1 10933079H1 10933079 None None \n",
"8 292908 4210933527H1 10933527H1 10933527 None None \n",
"9 293901 4210933085H1 10933085H1 10933085 None None \n",
"\n",
" api_number api10_number \n",
"0 05730992 None \n",
"1 05731591 None \n",
"2 70330280 None \n",
"3 70330331 None \n",
"4 10933080 None \n",
"5 10933084 None \n",
"6 10933532 None \n",
"7 10933079 None \n",
"8 10933527 None \n",
"9 10933085 None \n",
"Coverage summary:\n",
"{'total_wells': 1390194, 'with_tract_geoid': 1388666, 'without_tract_geoid': 1528, 'coverage_pct': 99.8901}\n",
"Focused length counts:\n",
"{'apinum_len10': 845288, 'api10_len10': 182502, 'api_len10': 0, 'api_len3': 363147, 'api10_len3': 362399, 'apinum_len5': 362399}\n",
"Short API samples (showing related fields):\n",
" id api api10 apinum api_digits_raw api10_digits_raw apinum_digits_raw\n",
"0 643559 249 249 42249 249 249 42249\n",
"1 932957 371 371 42371 371 371 42371\n",
"2 933051 371 371 42371 371 371 42371\n",
"3 643600 249 249 42249 249 249 42249\n",
"4 643603 249 249 42249 249 249 42249\n",
"5 643605 249 249 42249 249 249 42249\n",
"6 643638 249 249 42249 249 249 42249\n",
"7 933718 371 371 42371 371 371 42371\n",
"8 933745 371 371 42371 371 371 42371\n",
"9 933963 371 371 42371 371 371 42371\n",
"Derived vs source (first 10):\n",
" id apinum api10 api api10_clean api8_clean \\\n",
"0 163778 4205730992D1 05730992D1 05730992 None None \n",
"1 164009 4205731591D1 05731591D1 05731591 None None \n",
"2 163965 4270330280D1 70330280D1 70330280 None None \n",
"3 164598 4270330331D1 70330331D1 70330331 None None \n",
"4 291223 4210933080H1 10933080H1 10933080 None None \n",
"5 292357 4210933084H1 10933084H1 10933084 None None \n",
"6 292494 4210933532H1 10933532H1 10933532 None None \n",
"7 292570 4210933079H1 10933079H1 10933079 None None \n",
"8 292908 4210933527H1 10933527H1 10933527 None None \n",
"9 293901 4210933085H1 10933085H1 10933085 None None \n",
"\n",
" api_number api10_number \n",
"0 05730992 None \n",
"1 05731591 None \n",
"2 70330280 None \n",
"3 70330331 None \n",
"4 10933080 None \n",
"5 10933084 None \n",
"6 10933532 None \n",
"7 10933079 None \n",
"8 10933527 None \n",
"9 10933085 None \n",
"Coverage summary:\n",
"{'total_wells': 1390194, 'with_tract_geoid': 1388666, 'without_tract_geoid': 1528, 'coverage_pct': 99.8901}\n"
]
}
],
"source": [
"import pandas as pd\n",
"from sqlalchemy import text\n",
"\n",
"# Cell 11: API diagnostics & coverage summary (post derivation)\n",
"\n",
"# Length distributions for each raw column (separate queries to avoid GROUP BY mistakes)\n",
"with engine.begin() as conn:\n",
" len_api = pd.read_sql(text(\"\"\"\n",
" SELECT length(api) AS length, COUNT(*) AS count\n",
" FROM well_shapes\n",
" GROUP BY length\n",
" ORDER BY count DESC NULLS LAST\n",
" \"\"\"), conn)\n",
"\n",
" len_api10 = pd.read_sql(text(\"\"\"\n",
" SELECT length(api10) AS length, COUNT(*) AS count\n",
" FROM well_shapes\n",
" GROUP BY length\n",
" ORDER BY count DESC NULLS LAST\n",
" \"\"\"), conn)\n",
"\n",
" len_apinum = pd.read_sql(text(\"\"\"\n",
" SELECT length(apinum) AS length, COUNT(*) AS count\n",
" FROM well_shapes\n",
" GROUP BY length\n",
" ORDER BY count DESC NULLS LAST\n",
" \"\"\"), conn)\n",
"\n",
"print(\"Length distribution for api (length -> count):\")\n",
"print(len_api.head(20))\n",
"print(\"\\nLength distribution for api10 (length -> count):\")\n",
"print(len_api10.head(20))\n",
"print(\"\\nLength distribution for apinum (length -> count):\")\n",
"print(len_apinum.head(20))\n",
"\n",
"# Focused counts for lengths of interest\n",
"with engine.begin() as conn:\n",
" focused = conn.execute(text(\"\"\"\n",
" SELECT\n",
" COUNT(*) FILTER (WHERE length(apinum)=10) AS apinum_len10,\n",
" COUNT(*) FILTER (WHERE length(api10)=10) AS api10_len10,\n",
" COUNT(*) FILTER (WHERE length(api)=10) AS api_len10,\n",
" COUNT(*) FILTER (WHERE length(api)=3) AS api_len3,\n",
" COUNT(*) FILTER (WHERE length(api10)=3) AS api10_len3,\n",
" COUNT(*) FILTER (WHERE length(apinum)=5) AS apinum_len5\n",
" FROM well_shapes\n",
" \"\"\")).fetchone()\n",
"\n",
"print(\"Focused length counts:\")\n",
"print({\n",
" 'apinum_len10': focused[0],\n",
" 'api10_len10': focused[1],\n",
" 'api_len10': focused[2],\n",
" 'api_len3': focused[3],\n",
" 'api10_len3': focused[4],\n",
" 'apinum_len5': focused[5]\n",
"})\n",
"\n",
"# Sample of short api values with related fields for context\n",
"with engine.begin() as conn:\n",
" short_sample = pd.read_sql(text(\"\"\"\n",
" SELECT id, api, api10, apinum, api_digits_raw, api10_digits_raw, apinum_digits_raw\n",
" FROM well_shapes\n",
" WHERE length(api)=3 OR length(api10)=3\n",
" LIMIT 10\n",
" \"\"\"), conn)\n",
"print(\"Short API samples (showing related fields):\")\n",
"print(short_sample)\n",
"\n",
"# Derived vs source comparison (show examples where either derived 8- or 10-digit was produced)\n",
"with engine.begin() as conn:\n",
" derived_sample = pd.read_sql(text(\"\"\"\n",
" SELECT id, apinum, api10, api, api10_clean, api8_clean, api_number, api10_number\n",
" FROM well_shapes\n",
" WHERE api_number IS NOT NULL OR api10_number IS NOT NULL\n",
" LIMIT 10\n",
" \"\"\"), conn)\n",
"print(\"Derived vs source (first 10):\")\n",
"print(derived_sample)\n",
"\n",
"# Coverage summary (tract assignment)\n",
"with engine.begin() as conn:\n",
" total = conn.execute(text(\"SELECT COUNT(*) FROM well_shapes\")).scalar()\n",
" tract_assigned = conn.execute(text(\"SELECT COUNT(*) FROM well_shapes WHERE census_tract_geoid IS NOT NULL\")).scalar()\n",
"\n",
"coverage = {\n",
" 'total_wells': total,\n",
" 'with_tract_geoid': tract_assigned,\n",
" 'without_tract_geoid': (total - tract_assigned) if total is not None else None,\n",
" 'coverage_pct': round(tract_assigned * 100.0 / total, 4) if total else None\n",
"}\n",
"print(\"Coverage summary:\")\n",
"print(coverage)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c1ae22ec",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"id": "ca7047f5",
"metadata": {},
"source": [
"## Deduplication Plan\n",
"\n",
"We will remove duplicate well records based on a canonical API key:\n",
"\n",
"Key selection precedence:\n",
"1. api10_number (full 10-digit API) if present.\n",
"2. api_number (8-digit derived) if 10-digit is NULL.\n",
"\n",
"For each duplicate group (same dedup_key), keep the single \"best\" row determined by a quality score:\n",
"- +10 if geom is not NULL.\n",
"- +5 if both latitude and longitude are not NULL.\n",
"- +4 if apinum_digits_raw present and length=10.\n",
"- +3 if api10_clean present.\n",
"- +2 if census_tract_geoid present.\n",
"- +1 for every other non-null attribute column (excluding working/derived columns and id/geom).\n",
"\n",
"Process outline:\n",
"1. Audit duplicates (counts and top example groups).\n",
"2. (Optional) Create a full backup table `well_shapes_backup_dedup` before deletion.\n",
"3. Use a window function (ROW_NUMBER over dedup_key ordered by quality_score DESC, id ASC) to delete rows with rn>1.\n",
"4. Re-check that duplicates are gone; add UNIQUE constraints and indexes on api10_number/api_number.\n",
"\n",
"Execution safety:\n",
"- Controlled by a flag EXECUTE_DELETE (default False) so the cell can be run safely for preview first.\n",
"- Backup table preserves all original rows for easy restore if needed.\n",
"\n",
"Assumptions:\n",
"- api_number derived from api10 so consistency between both keys in groups.\n",
"- There is no authoritative timestamp column; we fall back to id ordering for deterministic tie-break.\n",
"\n",
"You can adjust weights if needed before running deletion."
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "068460b9",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'keys_present': 1027045, 'keys_null': 363149, 'duplicate_groups': 9401}\n",
"Top duplicate groups (by count):\n",
" dedup_key cnt\n",
"0 4239531364 15\n",
"1 12733586 14\n",
"2 4239530892 13\n",
"3 4212733751 11\n",
"4 50732832 11\n",
"5 12738366 10\n",
"6 4239530437 10\n",
"7 02131620 9\n",
"8 04131132 9\n",
"9 04131897 9\n",
"Top duplicate groups (by count):\n",
" dedup_key cnt\n",
"0 4239531364 15\n",
"1 12733586 14\n",
"2 4239530892 13\n",
"3 4212733751 11\n",
"4 50732832 11\n",
"5 12738366 10\n",
"6 4239530437 10\n",
"7 02131620 9\n",
"8 04131132 9\n",
"9 04131897 9\n",
"Sample rows for example duplicate key:\n",
" id api api10 apinum api_number api10_number \\\n",
"0 983150 39531364 39531364HD 4239531364HD 39531364 4239531364 \n",
"1 983190 39531364 39531364HF 4239531364HF 39531364 4239531364 \n",
"2 983193 39531364 39531364HO 4239531364HO 39531364 4239531364 \n",
"3 983367 39531364 39531364HA 4239531364HA 39531364 4239531364 \n",
"4 983378 39531364 39531364HL 4239531364HL 39531364 4239531364 \n",
"\n",
" latitude longitude census_tract_geoid has_geom \n",
"0 30.803677 -96.473793 48395960400 True \n",
"1 30.797427 -96.466177 48395960400 True \n",
"2 30.795806 -96.468429 48395960400 True \n",
"3 30.814532 -96.478007 48395960400 True \n",
"4 30.810300 -96.480577 48395960400 True \n",
"Sample rows for example duplicate key:\n",
" id api api10 apinum api_number api10_number \\\n",
"0 983150 39531364 39531364HD 4239531364HD 39531364 4239531364 \n",
"1 983190 39531364 39531364HF 4239531364HF 39531364 4239531364 \n",
"2 983193 39531364 39531364HO 4239531364HO 39531364 4239531364 \n",
"3 983367 39531364 39531364HA 4239531364HA 39531364 4239531364 \n",
"4 983378 39531364 39531364HL 4239531364HL 39531364 4239531364 \n",
"\n",
" latitude longitude census_tract_geoid has_geom \n",
"0 30.803677 -96.473793 48395960400 True \n",
"1 30.797427 -96.466177 48395960400 True \n",
"2 30.795806 -96.468429 48395960400 True \n",
"3 30.814532 -96.478007 48395960400 True \n",
"4 30.810300 -96.480577 48395960400 True \n"
]
}
],
"source": [
"# Cell 12: Duplicate audit (preview only)\n",
"from sqlalchemy import text\n",
"import pandas as pd\n",
"\n",
"# Helper: show top duplicate keys and sample rows\n",
"with engine.begin() as conn:\n",
" # Count duplicate groups for both keys\n",
" dup_counts = conn.execute(text(\n",
" \"\"\"\n",
" WITH k AS (\n",
" SELECT COALESCE(api10_number, api_number) AS dedup_key\n",
" FROM well_shapes\n",
" )\n",
" SELECT\n",
" COUNT(*) FILTER (WHERE dedup_key IS NOT NULL) AS keys_present,\n",
" COUNT(*) FILTER (WHERE dedup_key IS NULL) AS keys_null,\n",
" COUNT(*) FILTER (WHERE dedup_key IS NOT NULL) -\n",
" COUNT(DISTINCT dedup_key) AS duplicate_groups\n",
" FROM k\n",
" \"\"\"\n",
" )).fetchone()\n",
"\n",
"print({\n",
" 'keys_present': dup_counts[0],\n",
" 'keys_null': dup_counts[1],\n",
" 'duplicate_groups': dup_counts[2]\n",
"})\n",
"\n",
"with engine.begin() as conn:\n",
" top_dups = pd.read_sql(text(\n",
" \"\"\"\n",
" SELECT COALESCE(api10_number, api_number) AS dedup_key, COUNT(*) AS cnt\n",
" FROM well_shapes\n",
" WHERE COALESCE(api10_number, api_number) IS NOT NULL\n",
" GROUP BY 1\n",
" HAVING COUNT(*) > 1\n",
" ORDER BY cnt DESC, dedup_key\n",
" LIMIT 10\n",
" \"\"\"\n",
" ), conn)\n",
"\n",
"print(\"Top duplicate groups (by count):\")\n",
"print(top_dups)\n",
"\n",
"# Show sample rows for one duplicate key (if any)\n",
"if not top_dups.empty:\n",
" example_key = top_dups.iloc[0]['dedup_key']\n",
" with engine.begin() as conn:\n",
" sample_rows = pd.read_sql(text(\n",
" \"\"\"\n",
" SELECT id, api, api10, apinum, api_number, api10_number, latitude, longitude,\n",
" census_tract_geoid, geom IS NOT NULL AS has_geom\n",
" FROM well_shapes\n",
" WHERE COALESCE(api10_number, api_number) = :k\n",
" ORDER BY id\n",
" LIMIT 5\n",
" \"\"\"\n",
" ), conn, params={'k': example_key})\n",
" print(\"Sample rows for example duplicate key:\")\n",
" print(sample_rows)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "277b2c9a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Backup table well_shapes_backup_dedup created (full copy).\n",
"{'would_delete': 9401, 'would_keep': 1017644, 'key_null': 363149}\n",
"{'would_delete': 9401, 'would_keep': 1017644, 'key_null': 363149}\n",
"Deleted 9401 duplicate rows.\n",
"Deleted 9401 duplicate rows.\n"
]
}
],
"source": [
"# Cell 13: Deduplicate in-place with quality score (guarded by EXECUTE_DELETE)\n",
"from sqlalchemy import text\n",
"\n",
"EXECUTE_DELETE = True # set True to perform deletion\n",
"\n",
"# Build a dynamic quality score: higher is better\n",
"# We approximate \"more complete\" by counting non-null attributes and adding weights for geom/latlon/api10/apinum/tract\n",
"\n",
"# Identify attribute columns to count as \"other\" (exclude id, geom, and working/derived)\n",
"with engine.begin() as conn:\n",
" cols = [r[0] for r in conn.execute(text(\n",
" \"\"\"\n",
" SELECT column_name\n",
" FROM information_schema.columns\n",
" WHERE table_schema='public' AND table_name='well_shapes'\n",
" \"\"\"\n",
" )).fetchall()]\n",
"\n",
"exclude_cols = set(['id','geom','latitude','longitude','census_tract_geoid',\n",
" 'api','api10','apinum',\n",
" 'api_digits_raw','api10_digits_raw','apinum_digits_raw',\n",
" 'api10_clean','api8_clean','api_number','api10_number'])\n",
"other_cols = [c for c in cols if c not in exclude_cols]\n",
"\n",
"# Construct expression to count non-null other columns\n",
"other_nonnull_expr = \" + \".join([f\"(CASE WHEN {c} IS NOT NULL THEN 1 ELSE 0 END)\" for c in other_cols]) or \"0\"\n",
"\n",
"score_expr = f\"\"\"\n",
" (CASE WHEN geom IS NOT NULL THEN 10 ELSE 0 END)\n",
"+ (CASE WHEN latitude IS NOT NULL AND longitude IS NOT NULL THEN 5 ELSE 0 END)\n",
"+ (CASE WHEN apinum_digits_raw IS NOT NULL AND length(apinum_digits_raw)=10 THEN 4 ELSE 0 END)\n",
"+ (CASE WHEN api10_clean IS NOT NULL THEN 3 ELSE 0 END)\n",
"+ (CASE WHEN census_tract_geoid IS NOT NULL THEN 2 ELSE 0 END)\n",
"+ ({other_nonnull_expr})\n",
"\"\"\"\n",
"\n",
"# Create backup table (idempotent replace)\n",
"with engine.begin() as conn:\n",
" conn.execute(text(\"DROP TABLE IF EXISTS well_shapes_backup_dedup\"))\n",
" conn.execute(text(\"CREATE TABLE well_shapes_backup_dedup AS TABLE well_shapes\"))\n",
"\n",
"print(\"Backup table well_shapes_backup_dedup created (full copy).\")\n",
"\n",
"# Compute row_number per dedup_key; preview counts\n",
"with engine.begin() as conn:\n",
" preview = conn.execute(text(f\"\"\"\n",
" WITH ranked AS (\n",
" SELECT id,\n",
" COALESCE(api10_number, api_number) AS dedup_key,\n",
" {score_expr} AS quality_score,\n",
" ROW_NUMBER() OVER (\n",
" PARTITION BY COALESCE(api10_number, api_number)\n",
" ORDER BY {score_expr} DESC, id ASC\n",
" ) AS rn\n",
" FROM well_shapes\n",
" )\n",
" SELECT\n",
" COUNT(*) FILTER (WHERE dedup_key IS NOT NULL AND rn>1) AS would_delete,\n",
" COUNT(*) FILTER (WHERE dedup_key IS NOT NULL AND rn=1) AS would_keep,\n",
" COUNT(*) FILTER (WHERE dedup_key IS NULL) AS key_null\n",
" FROM ranked\n",
" \"\"\")).fetchone()\n",
"\n",
"print({\n",
" 'would_delete': preview[0],\n",
" 'would_keep': preview[1],\n",
" 'key_null': preview[2]\n",
"})\n",
"\n",
"if EXECUTE_DELETE:\n",
" with engine.begin() as conn:\n",
" deleted = conn.execute(text(f\"\"\"\n",
" WITH ranked AS (\n",
" SELECT id,\n",
" COALESCE(api10_number, api_number) AS dedup_key,\n",
" {score_expr} AS quality_score,\n",
" ROW_NUMBER() OVER (\n",
" PARTITION BY COALESCE(api10_number, api_number)\n",
" ORDER BY {score_expr} DESC, id ASC\n",
" ) AS rn\n",
" FROM well_shapes\n",
" )\n",
" DELETE FROM well_shapes w\n",
" USING ranked r\n",
" WHERE w.id = r.id AND r.dedup_key IS NOT NULL AND r.rn > 1\n",
" RETURNING 1\n",
" \"\"\")).rowcount\n",
" print(f\"Deleted {deleted or 0} duplicate rows.\")\n",
"else:\n",
" print(\"EXECUTE_DELETE is False: no rows deleted. Set to True to perform deduplication.\")"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "717433fb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'duplicate_groups_after': 0}\n",
"Indexes ensured; constraints applied only if no duplicates remained; ANALYZE complete.\n",
"Indexes ensured; constraints applied only if no duplicates remained; ANALYZE complete.\n"
]
}
],
"source": [
"# Cell 14: Post-dedup verification and indexing\n",
"from sqlalchemy import text\n",
"import pandas as pd\n",
"\n",
"with engine.begin() as conn:\n",
" # Re-check duplicates\n",
" dup_check = conn.execute(text(\n",
" \"\"\"\n",
" WITH k AS (\n",
" SELECT COALESCE(api10_number, api_number) AS dedup_key\n",
" FROM well_shapes\n",
" )\n",
" SELECT\n",
" COUNT(*) FILTER (WHERE dedup_key IS NOT NULL) - COUNT(DISTINCT dedup_key) AS duplicate_groups\n",
" FROM k\n",
" \"\"\"\n",
" )).scalar()\n",
"\n",
"print({'duplicate_groups_after': dup_check})\n",
"\n",
"# Ensure indexes and optional constraints\n",
"with engine.begin() as conn:\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_well_shapes_api10_number ON well_shapes (api10_number)\"))\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_well_shapes_api_number ON well_shapes (api_number)\"))\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_well_shapes_census_tract_geoid ON well_shapes (census_tract_geoid)\"))\n",
" # Only add unique constraints if no duplicates remain\n",
" if dup_check == 0:\n",
" conn.execute(text(\"ALTER TABLE well_shapes DROP CONSTRAINT IF EXISTS well_shapes_api10_number_unique\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD CONSTRAINT well_shapes_api10_number_unique UNIQUE (api10_number)\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes DROP CONSTRAINT IF EXISTS well_shapes_api_number_unique\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD CONSTRAINT well_shapes_api_number_unique UNIQUE (api_number)\"))\n",
" # Analyze for planner stats\n",
" conn.execute(text(\"ANALYZE well_shapes\"))\n",
"\n",
"print(\"Indexes ensured; constraints applied only if no duplicates remained; ANALYZE complete.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c0fb03ae",
"metadata": {},
"outputs": [],
"source": [
"# Cell 15 (optional): Restore original rows from backup\n",
"from sqlalchemy import text\n",
"\n",
"CONFIRM_RESTORE = False # set True to restore from backup table\n",
"\n",
"if CONFIRM_RESTORE:\n",
" with engine.begin() as conn:\n",
" # Sanity check backup exists and has rows\n",
" backup_rows = conn.execute(text(\"SELECT COUNT(*) FROM well_shapes_backup_dedup\")).scalar()\n",
" if not backup_rows:\n",
" raise SystemExit(\"Backup table is empty or missing; aborting restore.\")\n",
" # Replace current with backup\n",
" conn.execute(text(\"TRUNCATE TABLE well_shapes\"))\n",
" conn.execute(text(\"INSERT INTO well_shapes SELECT * FROM well_shapes_backup_dedup\"))\n",
" conn.execute(text(\"ANALYZE well_shapes\"))\n",
" print(f\"Restored {backup_rows} rows from well_shapes_backup_dedup.\")\n",
"else:\n",
" print(\"CONFIRM_RESTORE is False: not restoring.\")"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "77c01c81",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'rows_with_key': 1017644, 'distinct_keys': 1010430, 'duplicate_groups': 7214}\n",
"Top duplicate groups after first pass (canonical):\n",
" key cnt\n",
"0 4200100766 2\n",
"1 4200131333 2\n",
"2 4200131738 2\n",
"3 4200132166 2\n",
"4 4200132207 2\n",
"5 4200132236 2\n",
"6 4200132318 2\n",
"7 4200132407 2\n",
"8 4200132416 2\n",
"9 4200132501 2\n",
"Top duplicate groups after first pass (canonical):\n",
" key cnt\n",
"0 4200100766 2\n",
"1 4200131333 2\n",
"2 4200131738 2\n",
"3 4200132166 2\n",
"4 4200132207 2\n",
"5 4200132236 2\n",
"6 4200132318 2\n",
"7 4200132407 2\n",
"8 4200132416 2\n",
"9 4200132501 2\n",
"Sample rows for top remaining duplicate group:\n",
" id api api10 apinum api_number api10_number \\\n",
"0 155 00100766 00100766DW 4200100766DW 00100766 4200100766 \n",
"1 2859 00100766 00100766D1 4200100766D1 00100766 None \n",
"\n",
" canonical_api10 latitude longitude census_tract_geoid has_geom \n",
"0 4200100766 31.89849 -95.926702 48001951100 True \n",
"1 4200100766 31.89721 -95.926583 48001951100 True \n",
"Sample rows for top remaining duplicate group:\n",
" id api api10 apinum api_number api10_number \\\n",
"0 155 00100766 00100766DW 4200100766DW 00100766 4200100766 \n",
"1 2859 00100766 00100766D1 4200100766D1 00100766 None \n",
"\n",
" canonical_api10 latitude longitude census_tract_geoid has_geom \n",
"0 4200100766 31.89849 -95.926702 48001951100 True \n",
"1 4200100766 31.89721 -95.926583 48001951100 True \n"
]
}
],
"source": [
"# Cell 16: Second-pass canonical key & audit\n",
"from sqlalchemy import text\n",
"import pandas as pd\n",
"\n",
"with engine.begin() as conn:\n",
" # Add a unified 10-digit canonical key if not exists\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD COLUMN IF NOT EXISTS canonical_api10 TEXT\"))\n",
" # Rule: prefer api10_number; if missing but api_number (8-digit) present, prepend '42' (Texas state code) to get a synthetic 10-digit; else NULL\n",
" conn.execute(text(\"\"\"\n",
" UPDATE well_shapes\n",
" SET canonical_api10 = CASE\n",
" WHEN api10_number IS NOT NULL AND length(api10_number)=10 THEN api10_number\n",
" WHEN api10_number IS NULL AND api_number IS NOT NULL AND length(api_number)=8 THEN '42' || api_number\n",
" ELSE canonical_api10 END\n",
" WHERE canonical_api10 IS NULL\n",
" \"\"\"))\n",
"\n",
"# Audit duplicates on canonical_api10\n",
"with engine.begin() as conn:\n",
" dup_info = conn.execute(text(\"\"\"\n",
" SELECT COUNT(*) FILTER (WHERE canonical_api10 IS NOT NULL) AS key_rows,\n",
" COUNT(DISTINCT canonical_api10) AS distinct_keys,\n",
" (COUNT(*) FILTER (WHERE canonical_api10 IS NOT NULL) - COUNT(DISTINCT canonical_api10)) AS duplicate_groups\n",
" FROM well_shapes\n",
" \"\"\")).fetchone()\n",
"\n",
"print({\n",
" 'rows_with_key': dup_info[0],\n",
" 'distinct_keys': dup_info[1],\n",
" 'duplicate_groups': dup_info[2]\n",
"})\n",
"\n",
"with engine.begin() as conn:\n",
" top_dups2 = pd.read_sql(text(\"\"\"\n",
" SELECT canonical_api10 AS key, COUNT(*) AS cnt\n",
" FROM well_shapes\n",
" WHERE canonical_api10 IS NOT NULL\n",
" GROUP BY 1\n",
" HAVING COUNT(*)>1\n",
" ORDER BY cnt DESC, key\n",
" LIMIT 10\n",
" \"\"\"), conn)\n",
"print(\"Top duplicate groups after first pass (canonical):\")\n",
"print(top_dups2)\n",
"\n",
"if not top_dups2.empty:\n",
" sample_key = top_dups2.iloc[0]['key']\n",
" with engine.begin() as conn:\n",
" sample2 = pd.read_sql(text(\"\"\"\n",
" SELECT id, api, api10, apinum, api_number, api10_number, canonical_api10,\n",
" latitude, longitude, census_tract_geoid, geom IS NOT NULL AS has_geom\n",
" FROM well_shapes\n",
" WHERE canonical_api10 = :k\n",
" ORDER BY id\n",
" LIMIT 10\n",
" \"\"\"), conn, params={'k': sample_key})\n",
" print(\"Sample rows for top remaining duplicate group:\")\n",
" print(sample2)\n",
"else:\n",
" print(\"No remaining duplicate groups on canonical_api10.\")"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "94717e26",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'would_delete': 7214, 'would_keep': 1010430}\n",
"Deleted 7214 additional duplicate rows.\n",
"Deleted 7214 additional duplicate rows.\n"
]
}
],
"source": [
"# Cell 17: Second-pass dedup using canonical_api10 (guarded)\n",
"from sqlalchemy import text\n",
"\n",
"EXECUTE_DELETE2 = True # set True to perform second-pass deletion\n",
"\n",
"# Reuse the prior quality score but base partition on canonical_api10\n",
"with engine.begin() as conn:\n",
" cols = [r[0] for r in conn.execute(text(\n",
" \"\"\"\n",
" SELECT column_name\n",
" FROM information_schema.columns\n",
" WHERE table_schema='public' AND table_name='well_shapes'\n",
" \"\"\"\n",
" )).fetchall()]\n",
"\n",
"exclude_cols = set(['id','geom','latitude','longitude','census_tract_geoid',\n",
" 'api','api10','apinum',\n",
" 'api_digits_raw','api10_digits_raw','apinum_digits_raw',\n",
" 'api10_clean','api8_clean','api_number','api10_number','canonical_api10'])\n",
"other_cols = [c for c in cols if c not in exclude_cols]\n",
"other_nonnull_expr = \" + \".join([f\"(CASE WHEN {c} IS NOT NULL THEN 1 ELSE 0 END)\" for c in other_cols]) or \"0\"\n",
"\n",
"score_expr = f\"\"\"\n",
" (CASE WHEN geom IS NOT NULL THEN 10 ELSE 0 END)\n",
"+ (CASE WHEN latitude IS NOT NULL AND longitude IS NOT NULL THEN 5 ELSE 0 END)\n",
"+ (CASE WHEN apinum_digits_raw IS NOT NULL AND length(apinum_digits_raw)=10 THEN 4 ELSE 0 END)\n",
"+ (CASE WHEN api10_clean IS NOT NULL THEN 3 ELSE 0 END)\n",
"+ (CASE WHEN census_tract_geoid IS NOT NULL THEN 2 ELSE 0 END)\n",
"+ ({other_nonnull_expr})\n",
"\"\"\"\n",
"\n",
"# Preview what would be deleted\n",
"with engine.begin() as conn:\n",
" preview2 = conn.execute(text(f\"\"\"\n",
" WITH ranked AS (\n",
" SELECT id,\n",
" canonical_api10 AS dedup_key,\n",
" {score_expr} AS quality_score,\n",
" ROW_NUMBER() OVER (\n",
" PARTITION BY canonical_api10\n",
" ORDER BY {score_expr} DESC, id ASC\n",
" ) AS rn\n",
" FROM well_shapes\n",
" WHERE canonical_api10 IS NOT NULL\n",
" )\n",
" SELECT COUNT(*) FILTER (WHERE rn>1) AS would_delete,\n",
" COUNT(*) FILTER (WHERE rn=1) AS would_keep\n",
" FROM ranked\n",
" \"\"\")).fetchone()\n",
"\n",
"print({'would_delete': preview2[0], 'would_keep': preview2[1]})\n",
"\n",
"if EXECUTE_DELETE2:\n",
" with engine.begin() as conn:\n",
" deleted2 = conn.execute(text(f\"\"\"\n",
" WITH ranked AS (\n",
" SELECT id,\n",
" canonical_api10 AS dedup_key,\n",
" {score_expr} AS quality_score,\n",
" ROW_NUMBER() OVER (\n",
" PARTITION BY canonical_api10\n",
" ORDER BY {score_expr} DESC, id ASC\n",
" ) AS rn\n",
" FROM well_shapes\n",
" WHERE canonical_api10 IS NOT NULL\n",
" )\n",
" DELETE FROM well_shapes w\n",
" USING ranked r\n",
" WHERE w.id = r.id AND r.rn > 1\n",
" RETURNING 1\n",
" \"\"\")).rowcount\n",
" print(f\"Deleted {deleted2 or 0} additional duplicate rows.\")\n",
"else:\n",
" print(\"EXECUTE_DELETE2 is False: not deleting in second pass.\")"
]
},
{
"cell_type": "code",
"execution_count": 30,
"id": "b8f76de7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'canonical_duplicate_groups_after': 0}\n",
"Applied UNIQUE constraint on canonical_api10\n",
"Applied UNIQUE constraint on canonical_api10\n",
"Final ANALYZE complete.\n",
"Final ANALYZE complete.\n"
]
}
],
"source": [
"# Cell 18: Final verification & constraints (after second pass)\n",
"from sqlalchemy import text\n",
"\n",
"with engine.begin() as conn:\n",
" final_dup = conn.execute(text(\"\"\"\n",
" SELECT COUNT(*) FILTER (WHERE canonical_api10 IS NOT NULL) - COUNT(DISTINCT canonical_api10) AS duplicate_groups\n",
" FROM well_shapes\n",
" \"\"\")).scalar()\n",
"\n",
"print({'canonical_duplicate_groups_after': final_dup})\n",
"\n",
"with engine.begin() as conn:\n",
" if final_dup == 0:\n",
" conn.execute(text(\"ALTER TABLE well_shapes DROP CONSTRAINT IF EXISTS well_shapes_canonical_api10_unique\"))\n",
" conn.execute(text(\"ALTER TABLE well_shapes ADD CONSTRAINT well_shapes_canonical_api10_unique UNIQUE (canonical_api10)\"))\n",
" print(\"Applied UNIQUE constraint on canonical_api10\")\n",
" else:\n",
" print(\"Skipped UNIQUE constraint on canonical_api10 (still duplicates present)\")\n",
" conn.execute(text(\"ANALYZE well_shapes\"))\n",
"\n",
"print(\"Final ANALYZE complete.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fbe051ab",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 32,
"id": "8b7d1ee8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Created table well_shape_tract with selected columns.\n",
"Indexes ensured on well_shape_tract; UNIQUE applied on canonical_api10 if safe; ANALYZE done.\n"
]
}
],
"source": [
"# Cell 19: Materialize to well_shape_tract (cleaned projection)\n",
"from sqlalchemy import text\n",
"\n",
"# Choose columns to keep: identifiers, geometry, lat/lon, tract, and a few useful sources\n",
"keep_cols = [\n",
" 'id',\n",
" 'canonical_api10', 'api10_number', 'api_number',\n",
" 'apinum', 'api10', 'api',\n",
" 'census_tract_geoid', 'latitude', 'longitude',\n",
" 'geom'\n",
"]\n",
"\n",
"# Build SELECT with only available columns\n",
"with engine.begin() as conn:\n",
" cols = [r[0] for r in conn.execute(text(\n",
" \"\"\"\n",
" SELECT column_name\n",
" FROM information_schema.columns\n",
" WHERE table_schema='public' AND table_name='well_shapes'\n",
" \"\"\"\n",
" )).fetchall()]\n",
"\n",
"available = [c for c in keep_cols if c in cols]\n",
"missing = [c for c in keep_cols if c not in cols]\n",
"if missing:\n",
" print({ 'skipping_missing_columns': missing })\n",
"\n",
"select_list = \", \".join(available)\n",
"\n",
"create_sql = f\"\"\"\n",
"DROP TABLE IF EXISTS well_shape_tract;\n",
"CREATE TABLE well_shape_tract AS\n",
"SELECT {select_list}\n",
"FROM well_shapes;\n",
"\"\"\"\n",
"\n",
"with engine.begin() as conn:\n",
" conn.execute(text(create_sql))\n",
"\n",
"print(\"Created table well_shape_tract with selected columns.\")\n",
"\n",
"# Add types/constraints/indexes\n",
"with engine.begin() as conn:\n",
" # Ensure geometry type and SRID declared (copy may keep it, but enforce explicitly)\n",
" conn.execute(text(\"ALTER TABLE well_shape_tract ALTER COLUMN geom TYPE geometry(Point,4326) USING ST_SetSRID(ST_Force2D(geom),4326)\"))\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_wst_geom ON well_shape_tract USING GIST (geom)\"))\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_wst_canonical_api10 ON well_shape_tract (canonical_api10)\"))\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_wst_api10_number ON well_shape_tract (api10_number)\"))\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_wst_api_number ON well_shape_tract (api_number)\"))\n",
" conn.execute(text(\"CREATE INDEX IF NOT EXISTS idx_wst_tract ON well_shape_tract (census_tract_geoid)\"))\n",
" # Only add unique if source is clean\n",
" dup = conn.execute(text(\"\"\"\n",
" SELECT COUNT(*) FILTER (WHERE canonical_api10 IS NOT NULL) - COUNT(DISTINCT canonical_api10)\n",
" FROM well_shape_tract\n",
" \"\"\")).scalar()\n",
" if dup == 0:\n",
" conn.execute(text(\"ALTER TABLE well_shape_tract DROP CONSTRAINT IF EXISTS wst_canonical_api10_unique\"))\n",
" conn.execute(text(\"ALTER TABLE well_shape_tract ADD CONSTRAINT wst_canonical_api10_unique UNIQUE (canonical_api10)\"))\n",
"\n",
" conn.execute(text(\"ANALYZE well_shape_tract\"))\n",
"\n",
"print(\"Indexes ensured on well_shape_tract; UNIQUE applied on canonical_api10 if safe; ANALYZE done.\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}