Initial commit
This commit is contained in:
449
rebuild/create_violations_inspections.ipynb
Normal file
449
rebuild/create_violations_inspections.ipynb
Normal file
@@ -0,0 +1,449 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"id": "df7e4dbd",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Note: you may need to restart the kernel to use updated packages.\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"%pip install matplotlib seaborn pandas numpy psycopg2-binary sqlalchemy jupyterlab --quiet"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"id": "17d89f9c",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"RealDictRow({'version': 'PostgreSQL 18.1 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 15.2.1 20251112, 64-bit'})\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# Cell 0: connect to Postgres\n",
|
||||
"# Requires: pip install psycopg2-binary\n",
|
||||
"import os\n",
|
||||
"import psycopg2\n",
|
||||
"from psycopg2.extras import RealDictCursor\n",
|
||||
"\n",
|
||||
"# Configure via environment variables (safer) or edit defaults below\n",
|
||||
"PGHOST = os.getenv(\"PGHOST\", \"localhost\")\n",
|
||||
"PGPORT = os.getenv(\"PGPORT\", \"5432\")\n",
|
||||
"PGUSER = os.getenv(\"PGUSER\", \"postgres\")\n",
|
||||
"PGPASSWORD = os.getenv(\"PGPASSWORD\", \"\")\n",
|
||||
"# Default DB name set to \"postgres\"; will fall back if PGDATABASE not set in env\n",
|
||||
"PGDATABASE = os.getenv(\"PGDATABASE\", \"texas_data\")\n",
|
||||
"\n",
|
||||
"if PGDATABASE is None or PGDATABASE == \"\":\n",
|
||||
" print(\"[warn] PGDATABASE not set; using fallback 'postgres'\")\n",
|
||||
" PGDATABASE = \"postgres\"\n",
|
||||
"\n",
|
||||
"def get_conn():\n",
|
||||
" \"\"\"Return a new psycopg2 connection using configured environment variables.\"\"\"\n",
|
||||
" return psycopg2.connect(\n",
|
||||
" host=PGHOST,\n",
|
||||
" port=PGPORT,\n",
|
||||
" user=PGUSER,\n",
|
||||
" password=PGPASSWORD,\n",
|
||||
" dbname=PGDATABASE,\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
"# Quick connection test (only runs if executed as a script, not typical in notebook)\n",
|
||||
"if __name__ == \"__main__\":\n",
|
||||
" try:\n",
|
||||
" conn = get_conn()\n",
|
||||
" with conn.cursor(cursor_factory=RealDictCursor) as cur:\n",
|
||||
" cur.execute(\"SELECT version() AS version;\")\n",
|
||||
" print(cur.fetchone())\n",
|
||||
" conn.close()\n",
|
||||
" except Exception as e:\n",
|
||||
" print(\"Postgres connection failed:\", e)\n",
|
||||
" raise"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"id": "c1558b8f",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Detected 17 headers:\n",
|
||||
"['operator_name', 'p5_operator_no', 'district', 'oil_lease_gas_well_id', 'lease_fac_name', 'api_no', 'county', 'well_no', 'drilling_permit_no', 'field_name', 'violated_rule', 'violated_rule_desc', 'major_viol_ind', 'compliant_on_reinsp', 'last_enf_action', 'last_enf_action_date', 'violation_disc_date']\n",
|
||||
"Total lines (including header): 375676\n",
|
||||
"Raw load shape (after skipping bad lines): (375649, 17)\n",
|
||||
"Inferred types (second pass):\n",
|
||||
" operator_name: string\n",
|
||||
" p5_operator_no: Int64\n",
|
||||
" district: string\n",
|
||||
" oil_lease_gas_well_id: Int64\n",
|
||||
" lease_fac_name: string\n",
|
||||
" api_no: Int64\n",
|
||||
" county: string\n",
|
||||
" well_no: string\n",
|
||||
" drilling_permit_no: string\n",
|
||||
" field_name: string\n",
|
||||
" violated_rule: string\n",
|
||||
" violated_rule_desc: string\n",
|
||||
" major_viol_ind: string\n",
|
||||
" compliant_on_reinsp: string\n",
|
||||
" last_enf_action: string\n",
|
||||
" last_enf_action_date: Int64\n",
|
||||
" violation_disc_date: Int64\n",
|
||||
"Final dtypes summary:\n",
|
||||
"operator_name string\n",
|
||||
"p5_operator_no Int64\n",
|
||||
"district string\n",
|
||||
"oil_lease_gas_well_id Int64\n",
|
||||
"lease_fac_name string\n",
|
||||
"api_no Int64\n",
|
||||
"county string\n",
|
||||
"well_no string\n",
|
||||
"drilling_permit_no string\n",
|
||||
"field_name string\n",
|
||||
"violated_rule string\n",
|
||||
"violated_rule_desc string\n",
|
||||
"major_viol_ind string\n",
|
||||
"compliant_on_reinsp string\n",
|
||||
"last_enf_action string\n",
|
||||
"last_enf_action_date Int64\n",
|
||||
"violation_disc_date Int64\n",
|
||||
"dtype: object\n",
|
||||
"Final shape: (375649, 17)\n",
|
||||
"Saved cleaned violations.csv (malformed lines skipped, inferred types applied, headers lowercased).\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# Inspect and load VIOLATIONS.txt using its first row as headers with robust handling of malformed rows.\n",
|
||||
"import pandas as pd, os\n",
|
||||
"from pathlib import Path\n",
|
||||
"\n",
|
||||
"file_path = Path('VIOLATIONS.txt')\n",
|
||||
"if not file_path.exists():\n",
|
||||
" raise FileNotFoundError(f\"Expected {file_path} in working directory {os.getcwd()}\")\n",
|
||||
"\n",
|
||||
"# 1. Extract headers (first line) and normalize\n",
|
||||
"with file_path.open('r', encoding='utf-8', errors='ignore') as f:\n",
|
||||
" first_line = f.readline().rstrip('\\n')\n",
|
||||
"raw_headers = first_line.split('}')\n",
|
||||
"headers = [h.strip().lower() for h in raw_headers]\n",
|
||||
"print(f\"Detected {len(headers)} headers:\")\n",
|
||||
"print(headers)\n",
|
||||
"\n",
|
||||
"# 2. Initial len for progress info\n",
|
||||
"total_lines = sum(1 for _ in file_path.open('r', encoding='utf-8', errors='ignore'))\n",
|
||||
"print(f\"Total lines (including header): {total_lines}\")\n",
|
||||
"\n",
|
||||
"# 3. First pass: read full file as string, skip malformed lines\n",
|
||||
"# Using engine='python' with on_bad_lines='skip' to tolerate inconsistent quoting / delimiters\n",
|
||||
"raw_df = pd.read_csv(\n",
|
||||
" file_path,\n",
|
||||
" delimiter='}',\n",
|
||||
" header=0,\n",
|
||||
" names=headers,\n",
|
||||
" dtype=str,\n",
|
||||
" na_values=['', 'NA', 'NaN', 'null', 'NULL'],\n",
|
||||
" keep_default_na=True,\n",
|
||||
" engine='python',\n",
|
||||
" on_bad_lines='skip'\n",
|
||||
")\n",
|
||||
"print(f\"Raw load shape (after skipping bad lines): {raw_df.shape}\")\n",
|
||||
"\n",
|
||||
"# 4. Trim whitespace\n",
|
||||
"for c in raw_df.columns:\n",
|
||||
" raw_df[c] = raw_df[c].astype('string').str.strip()\n",
|
||||
"\n",
|
||||
"# 5. Infer types on cleaned sample (middle 100 rows) from raw_df directly\n",
|
||||
"mid_index = len(raw_df) // 2\n",
|
||||
"sample_slice = raw_df.iloc[max(0, mid_index-50): mid_index+50]\n",
|
||||
"\n",
|
||||
"inferred_types = []\n",
|
||||
"for c in raw_df.columns:\n",
|
||||
" series = sample_slice[c].dropna()\n",
|
||||
" non_empty = series[~series.isin(['', 'NA', 'NaN', 'null', 'NULL'])]\n",
|
||||
" # Date heuristic\n",
|
||||
" looks_date = c.endswith('_date') and non_empty.apply(lambda v: (('-' in v or '/' in v) and len(v) >= 8)).all() and len(non_empty) > 0\n",
|
||||
" if looks_date:\n",
|
||||
" inferred_types.append(('datetime', c))\n",
|
||||
" continue\n",
|
||||
" # Numeric checks\n",
|
||||
" def is_all_int(vals):\n",
|
||||
" try:\n",
|
||||
" return all(str(int(v)) == v or v.isdigit() for v in vals)\n",
|
||||
" except Exception:\n",
|
||||
" return False\n",
|
||||
" def is_all_float(vals):\n",
|
||||
" try:\n",
|
||||
" return all(pd.to_numeric([v]).dtype.kind in ('f','i') for v in vals)\n",
|
||||
" except Exception:\n",
|
||||
" return False\n",
|
||||
" if len(non_empty) > 0 and is_all_int(non_empty):\n",
|
||||
" inferred_types.append(('Int64', c))\n",
|
||||
" elif len(non_empty) > 0 and is_all_float(non_empty):\n",
|
||||
" inferred_types.append(('Float64', c))\n",
|
||||
" else:\n",
|
||||
" inferred_types.append(('string', c))\n",
|
||||
"\n",
|
||||
"print(\"Inferred types (second pass):\")\n",
|
||||
"for t, c in inferred_types:\n",
|
||||
" print(f\" {c}: {t}\")\n",
|
||||
"\n",
|
||||
"# 6. Apply conversions\n",
|
||||
"violations_df = raw_df.copy()\n",
|
||||
"for t, c in inferred_types:\n",
|
||||
" if t == 'datetime':\n",
|
||||
" violations_df[c] = pd.to_datetime(violations_df[c], errors='coerce', infer_datetime_format=True)\n",
|
||||
" elif t == 'Int64':\n",
|
||||
" violations_df[c] = pd.to_numeric(violations_df[c], errors='coerce').astype('Int64')\n",
|
||||
" elif t == 'Float64':\n",
|
||||
" violations_df[c] = pd.to_numeric(violations_df[c], errors='coerce').astype('Float64')\n",
|
||||
" else: # string\n",
|
||||
" violations_df[c] = violations_df[c].astype('string')\n",
|
||||
"\n",
|
||||
"print(\"Final dtypes summary:\")\n",
|
||||
"print(violations_df.dtypes)\n",
|
||||
"print(\"Final shape:\", violations_df.shape)\n",
|
||||
"\n",
|
||||
"# 7. Persist cleaned data\n",
|
||||
"violations_df.to_csv('violations.csv', index=False)\n",
|
||||
"print(\"Saved cleaned violations.csv (malformed lines skipped, inferred types applied, headers lowercased).\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"id": "da6ddc7c",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[INSPECTIONS] Detected 14 headers\n",
|
||||
"['operator_name', 'p5_operator_no', 'district', 'district_office_inspecting', 'oil_lease_gas_well_id', 'lease_fac_name', 'api_no', 'county', 'well_no', 'inspection_date', 'drilling_permit_no', 'complaint_no', 'compliance', 'field_name']\n",
|
||||
"[INSPECTIONS] Total lines (including header): 3734620\n",
|
||||
"[INSPECTIONS] Raw load shape (after skipping bad lines): (3734440, 14)\n",
|
||||
"[INSPECTIONS] Inferred types:\n",
|
||||
" operator_name: string\n",
|
||||
" p5_operator_no: Int64\n",
|
||||
" district: string\n",
|
||||
" district_office_inspecting: string\n",
|
||||
" oil_lease_gas_well_id: Int64\n",
|
||||
" lease_fac_name: string\n",
|
||||
" api_no: Int64\n",
|
||||
" county: string\n",
|
||||
" well_no: string\n",
|
||||
" inspection_date: Int64\n",
|
||||
" drilling_permit_no: Int64\n",
|
||||
" complaint_no: string\n",
|
||||
" compliance: string\n",
|
||||
" field_name: string\n",
|
||||
"[INSPECTIONS] Final dtypes summary:\n",
|
||||
"operator_name string\n",
|
||||
"p5_operator_no Int64\n",
|
||||
"district string\n",
|
||||
"district_office_inspecting string\n",
|
||||
"oil_lease_gas_well_id Int64\n",
|
||||
"lease_fac_name string\n",
|
||||
"api_no Int64\n",
|
||||
"county string\n",
|
||||
"well_no string\n",
|
||||
"inspection_date Int64\n",
|
||||
"drilling_permit_no Int64\n",
|
||||
"complaint_no string\n",
|
||||
"compliance string\n",
|
||||
"field_name string\n",
|
||||
"dtype: object\n",
|
||||
"[INSPECTIONS] Final shape: (3734440, 14)\n",
|
||||
"Saved inspections.csv (malformed lines skipped, inferred types applied, headers lowercased).\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# Robust ingest for INSPECTIONS.txt using first row as headers and skipping malformed lines.\n",
|
||||
"import pandas as pd, os\n",
|
||||
"from pathlib import Path\n",
|
||||
"\n",
|
||||
"insp_path = Path('INSPECTIONS.txt')\n",
|
||||
"if not insp_path.exists():\n",
|
||||
" raise FileNotFoundError(f\"Expected {insp_path} in working directory {os.getcwd()}\")\n",
|
||||
"\n",
|
||||
"with insp_path.open('r', encoding='utf-8', errors='ignore') as f:\n",
|
||||
" first_line = f.readline().rstrip('\\n')\n",
|
||||
"raw_headers = first_line.split('}')\n",
|
||||
"insp_headers = [h.strip().lower() for h in raw_headers]\n",
|
||||
"print(f\"[INSPECTIONS] Detected {len(insp_headers)} headers\")\n",
|
||||
"print(insp_headers)\n",
|
||||
"\n",
|
||||
"total_lines = sum(1 for _ in insp_path.open('r', encoding='utf-8', errors='ignore'))\n",
|
||||
"print(f\"[INSPECTIONS] Total lines (including header): {total_lines}\")\n",
|
||||
"\n",
|
||||
"insp_raw = pd.read_csv(\n",
|
||||
" insp_path,\n",
|
||||
" delimiter='}',\n",
|
||||
" header=0,\n",
|
||||
" names=insp_headers,\n",
|
||||
" dtype=str,\n",
|
||||
" na_values=['', 'NA', 'NaN', 'null', 'NULL'],\n",
|
||||
" keep_default_na=True,\n",
|
||||
" engine='python',\n",
|
||||
" on_bad_lines='skip'\n",
|
||||
")\n",
|
||||
"print(f\"[INSPECTIONS] Raw load shape (after skipping bad lines): {insp_raw.shape}\")\n",
|
||||
"\n",
|
||||
"for c in insp_raw.columns:\n",
|
||||
" insp_raw[c] = insp_raw[c].astype('string').str.strip()\n",
|
||||
"\n",
|
||||
"mid_index = len(insp_raw) // 2\n",
|
||||
"sample_slice = insp_raw.iloc[max(0, mid_index-50): mid_index+50]\n",
|
||||
"\n",
|
||||
"insp_inferred = []\n",
|
||||
"for c in insp_raw.columns:\n",
|
||||
" series = sample_slice[c].dropna()\n",
|
||||
" non_empty = series[~series.isin(['', 'NA', 'NaN', 'null', 'NULL'])]\n",
|
||||
" looks_date = c.endswith('_date') and non_empty.apply(lambda v: (('-' in v or '/' in v) and len(v) >= 8)).all() and len(non_empty) > 0\n",
|
||||
" if looks_date:\n",
|
||||
" insp_inferred.append(('datetime', c))\n",
|
||||
" continue\n",
|
||||
" def is_all_int(vals):\n",
|
||||
" try:\n",
|
||||
" return all(str(int(v)) == v or v.isdigit() for v in vals)\n",
|
||||
" except Exception:\n",
|
||||
" return False\n",
|
||||
" def is_all_float(vals):\n",
|
||||
" try:\n",
|
||||
" return all(pd.to_numeric([v]).dtype.kind in ('f','i') for v in vals)\n",
|
||||
" except Exception:\n",
|
||||
" return False\n",
|
||||
" if len(non_empty) > 0 and is_all_int(non_empty):\n",
|
||||
" insp_inferred.append(('Int64', c))\n",
|
||||
" elif len(non_empty) > 0 and is_all_float(non_empty):\n",
|
||||
" insp_inferred.append(('Float64', c))\n",
|
||||
" else:\n",
|
||||
" insp_inferred.append(('string', c))\n",
|
||||
"\n",
|
||||
"print(\"[INSPECTIONS] Inferred types:\")\n",
|
||||
"for t, c in insp_inferred:\n",
|
||||
" print(f\" {c}: {t}\")\n",
|
||||
"\n",
|
||||
"inspections_df = insp_raw.copy()\n",
|
||||
"for t, c in insp_inferred:\n",
|
||||
" if t == 'datetime':\n",
|
||||
" inspections_df[c] = pd.to_datetime(inspections_df[c], errors='coerce', infer_datetime_format=True)\n",
|
||||
" elif t == 'Int64':\n",
|
||||
" inspections_df[c] = pd.to_numeric(inspections_df[c], errors='coerce').astype('Int64')\n",
|
||||
" elif t == 'Float64':\n",
|
||||
" inspections_df[c] = pd.to_numeric(inspections_df[c], errors='coerce').astype('Float64')\n",
|
||||
" else:\n",
|
||||
" inspections_df[c] = inspections_df[c].astype('string')\n",
|
||||
"\n",
|
||||
"print(\"[INSPECTIONS] Final dtypes summary:\")\n",
|
||||
"print(inspections_df.dtypes)\n",
|
||||
"print(\"[INSPECTIONS] Final shape:\", inspections_df.shape)\n",
|
||||
"\n",
|
||||
"inspections_df.to_csv('inspections.csv', index=False)\n",
|
||||
"print(\"Saved inspections.csv (malformed lines skipped, inferred types applied, headers lowercased).\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 6,
|
||||
"id": "220bca51",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Loaded violations.csv into public.violations -> rows: 375649\n",
|
||||
"Loaded inspections.csv into public.inspections -> rows: 3734440\n",
|
||||
"Data ingestion complete.\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# Load cleaned CSVs into Postgres using SQLAlchemy\n",
|
||||
"import os\n",
|
||||
"import pandas as pd\n",
|
||||
"from sqlalchemy import create_engine, text\n",
|
||||
"from urllib.parse import quote_plus\n",
|
||||
"\n",
|
||||
"# Build SQLAlchemy engine from environment variables\n",
|
||||
"PGHOST = os.getenv(\"PGHOST\", \"localhost\")\n",
|
||||
"PGPORT = os.getenv(\"PGPORT\", \"5432\")\n",
|
||||
"PGUSER = os.getenv(\"PGUSER\", \"postgres\")\n",
|
||||
"PGPASSWORD = os.getenv(\"PGPASSWORD\", \"\")\n",
|
||||
"PGDATABASE = os.getenv(\"PGDATABASE\", \"texas_data\") # default for this notebook\n",
|
||||
"\n",
|
||||
"conn_url = f\"postgresql+psycopg2://{PGUSER}:{quote_plus(PGPASSWORD)}@{PGHOST}:{PGPORT}/{PGDATABASE}\"\n",
|
||||
"engine = create_engine(conn_url, pool_pre_ping=True)\n",
|
||||
"\n",
|
||||
"# Helper: read CSV (as string), coerce *_date columns to datetime, then write to Postgres\n",
|
||||
"def load_and_write_csv(csv_path: str, table_name: str, schema: str = \"public\") -> None:\n",
|
||||
" df = pd.read_csv(csv_path, dtype=str)\n",
|
||||
" # Coerce any *_date columns to datetime\n",
|
||||
" date_cols = [c for c in df.columns if c.endswith(\"_date\")]\n",
|
||||
" for c in date_cols:\n",
|
||||
" df[c] = pd.to_datetime(df[c], errors=\"coerce\")\n",
|
||||
" # Write to Postgres with batching\n",
|
||||
" df.to_sql(\n",
|
||||
" table_name,\n",
|
||||
" engine,\n",
|
||||
" schema=schema,\n",
|
||||
" if_exists=\"replace\",\n",
|
||||
" index=False,\n",
|
||||
" chunksize=10000,\n",
|
||||
" method=\"multi\",\n",
|
||||
" )\n",
|
||||
" # Report row count from DB\n",
|
||||
" with engine.connect() as conn:\n",
|
||||
" result = conn.execute(text(f\"SELECT COUNT(*) FROM {schema}.\\\"{table_name}\\\"\"))\n",
|
||||
" print(f\"Loaded {csv_path} into {schema}.{table_name} -> rows:\", result.scalar())\n",
|
||||
"\n",
|
||||
"# Load violations.csv and inspections.csv\n",
|
||||
"load_and_write_csv(\"violations.csv\", \"violations\")\n",
|
||||
"load_and_write_csv(\"inspections.csv\", \"inspections\")\n",
|
||||
"\n",
|
||||
"print(\"Data ingestion complete.\")"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": ".venv",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.13.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
||||
Reference in New Issue
Block a user