Files
texas-borderlands/analysis/well_analyzer.py
2026-02-20 23:17:10 -08:00

911 lines
36 KiB
Python

from __future__ import annotations
import json
import logging
import os
import warnings
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import quote_plus
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import Engine, create_engine, text
from sqlalchemy.exc import SQLAlchemyError
# Configure logging early so that downstream modules inherit the settings.
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Pandas throws a lot of noisy warnings when casting timestamps out of Postgres.
warnings.filterwarnings("ignore", category=UserWarning)
class WellAnalyzerError(Exception):
"""Base exception class for all analyzer failures."""
class ConfigError(WellAnalyzerError):
"""Raised when we cannot build a working configuration."""
class DataLoadError(WellAnalyzerError):
"""Raised when data cannot be loaded from the warehouse."""
class AnalysisError(WellAnalyzerError):
"""Raised when a downstream analytic task fails."""
@dataclass
class Config:
"""Runtime configuration for the analyzer."""
engine: Engine
chunk_size: int = 10_000
cache_dir: Path = Path("./cache")
well_source: str = ""
inspections_source: str = ""
violations_source: str = ""
def _load_engine_from_env() -> Engine:
"""Build a SQLAlchemy engine using PG* environment variables."""
load_dotenv(override=False)
host = os.getenv("PGHOST", "localhost")
port = os.getenv("PGPORT", "5432")
user = os.getenv("PGUSER", "postgres")
password = quote_plus(os.getenv("PGPASSWORD", ""))
database = os.getenv("PGDATABASE", "texas_data")
url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
logger.info("Connecting to Postgres", extra={"host": host, "database": database})
try:
return create_engine(url)
except SQLAlchemyError as exc:
raise ConfigError(f"Failed to create engine for {url}: {exc}") from exc
class WellAnalyzer:
"""
Analyze wells, inspections, violations, and environmental-justice indicators held in the
rebuilt PostGIS warehouse. The class auto-detects the rebuilt tables that now exist
in the texas-rebuild-postgis project so it works against both fresh rebuilds and future
refreshes without hand-editing SQL strings.
"""
ID_COLUMN = "api_norm"
WELL_SOURCE_CANDIDATES = [
"well_shape_tract",
]
BORDER_DISTANCE_CRS = "EPSG:5070" # meters-based CRS suitable for CONUS distances
WELL_COLUMN_MAP: Dict[str, List[str]] = {
"census_tract_geoid": ["census_tract_geoid", "geoid", "tract_geoid"],
"tract_name": ["tract_name", "name"],
"ruca_category": ["ruca_category"],
"ruca_code_2020": ["ruca_code_2020"],
"ruca_primary_description": ["ruca_primary_description"],
"ruca_secondary_description": ["ruca_secondary_description"],
"ej_composite_score": ["ej_composite_score"],
"pct_minority": ["pct_minority"],
"pct_hispanic": ["pct_hispanic"],
"poverty_rate": ["poverty_rate"],
"unemployment_rate": ["unemployment_rate"],
"less_than_hs_pct": ["less_than_hs_pct"],
"linguistic_isolation_rate": ["linguistic_isolation_rate"],
"renter_cost_burden_rate": ["renter_cost_burden_rate"],
"disability_rate": ["disability_rate"],
"pct_under5": ["pct_under5"],
"pct_65plus": ["pct_65plus"],
"median_household_income": ["median_household_income"],
"latitude": ["latitude", "lat"],
"longitude": ["longitude", "lon", "lng"],
"basin_label": ["basin_label", "basin_name"],
"play_label": ["play_label", "play_name"],
"texmex_name": ["texmex_name"],
"distance_to_texmex_km": ["distance_to_texmex_km"],
"within_25km_texmex": ["within_25km_texmex"],
"within_50km_texmex": ["within_50km_texmex"],
}
def __init__(
self,
engine: Optional[Engine] = None,
*,
chunk_size: int = 10_000,
cache_dir: Optional[Path] = None,
well_source: Optional[str] = None,
) -> None:
if engine is None:
engine = _load_engine_from_env()
self.config = Config(
engine=engine,
chunk_size=chunk_size,
cache_dir=(cache_dir or Path("./cache")),
)
self.config.well_source = well_source or self._detect_table(self.WELL_SOURCE_CANDIDATES)
if not self.config.well_source:
raise ConfigError(
"Could not find a well table/view. "
"Expected one of well_enriched_all_plus, well_enriched_all, well_with_demographics_table, well_shape_tract."
)
self.config.inspections_source = self._detect_table(["inspections"])
self.config.violations_source = self._detect_table(["violations"])
if not self.config.inspections_source or not self.config.violations_source:
raise ConfigError("Both inspections and violations tables must exist in the database.")
self.data: Dict[str, pd.DataFrame] = {}
self._initialize_data()
# --------------------------------------------------------------------------------------
# Helper methods for metadata detection and SQL building
# --------------------------------------------------------------------------------------
def _detect_table(self, candidates: List[str]) -> Optional[str]:
for candidate in candidates:
found = self._table_exists(candidate)
if found:
return found
return None
def _table_exists(self, table: str) -> Optional[str]:
names_to_try = []
if "." in table:
names_to_try.append(table)
else:
names_to_try.append(f"public.{table}")
names_to_try.append(table)
query = text("SELECT to_regclass(:name) IS NOT NULL AS exists")
with self.config.engine.begin() as conn:
for name in names_to_try:
exists = conn.execute(query, {"name": name}).scalar()
if exists:
return name
return None
def _split_table_name(self, qualified: str) -> Dict[str, Optional[str]]:
if "." in qualified:
schema, table = qualified.split(".", 1)
return {"schema": schema, "table": table}
return {"schema": None, "table": qualified}
def _get_columns(self, table: str) -> Dict[str, str]:
pieces = self._split_table_name(table)
sql = [
"SELECT column_name",
"FROM information_schema.columns",
"WHERE table_name = :table",
]
params = {"table": pieces["table"]}
if pieces["schema"]:
sql.append("AND table_schema = :schema")
params["schema"] = pieces["schema"]
sql.append("ORDER BY ordinal_position")
with self.config.engine.begin() as conn:
rows = conn.execute(text(" ".join(sql)), params).fetchall()
return {row[0].lower(): row[0] for row in rows}
def _pick_column(self, columns: Dict[str, str], names: List[str]) -> Optional[str]:
for name in names:
if name.lower() in columns:
return columns[name.lower()]
return None
def _execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
try:
df_chunks: List[pd.DataFrame] = []
for chunk in pd.read_sql(
text(query),
self.config.engine,
params=params,
chunksize=self.config.chunk_size,
):
df_chunks.append(chunk)
return pd.concat(df_chunks, ignore_index=True) if df_chunks else pd.DataFrame()
except SQLAlchemyError as exc:
logger.error("Query failed: %s", query, exc_info=True)
raise DataLoadError(f"Failed executing query: {exc}") from exc
# --------------------------------------------------------------------------------------
# Data loading
# --------------------------------------------------------------------------------------
def _initialize_data(self) -> None:
self.data["well_data"] = self._load_wells()
self._recompute_border_metrics()
self.data["inspections"] = self._load_inspections()
self.data["violations"] = self._load_violations()
if not self.data["well_data"].empty and (
not self.data["inspections"].empty or not self.data["violations"].empty
):
self._create_performance_metrics()
def _load_wells(self) -> pd.DataFrame:
table = self.config.well_source
columns = self._get_columns(table)
alias = "w"
api_norm_col = self._pick_column(columns, ["api_norm"])
if not api_norm_col:
raise DataLoadError(f"{table} does not expose api_norm")
select_parts = [f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}']
for target, candidates in self.WELL_COLUMN_MAP.items():
column = self._pick_column(columns, candidates)
if column:
select_parts.append(f'{alias}."{column}" AS {target}')
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias}'
df = self._execute_query(query)
df[self.ID_COLUMN] = df[self.ID_COLUMN].astype(str).str.strip()
df = df[df[self.ID_COLUMN].notna()]
df = df.drop_duplicates(subset=[self.ID_COLUMN]).reset_index(drop=True)
logger.info("Loaded %s wells from %s", len(df), table)
return df
def _resolve_texmex_shapefile(self) -> Optional[Path]:
"""Resolve the Texas-Mexico boundary shapefile path relative to this module/repo."""
base_dir = Path(__file__).resolve().parent
candidates = [
base_dir / "../data/texmex_shape/tl_2023_us_internationalboundary.shp",
base_dir / "data/texmex_shape/tl_2023_us_internationalboundary.shp",
Path.cwd() / "data/texmex_shape/tl_2023_us_internationalboundary.shp",
Path.cwd() / "../data/texmex_shape/tl_2023_us_internationalboundary.shp",
]
for candidate in candidates:
path = candidate.resolve()
if path.exists():
return path
return None
def _recompute_border_metrics(self) -> None:
"""
Recompute border proximity fields from geometry instead of trusting source-table flags.
This replaces/creates:
- distance_to_texmex_km
- within_25km_texmex
- within_50km_texmex
- texmex_name (simple computed tag for border-proximate wells)
"""
wells = self.data.get("well_data")
if wells is None or wells.empty:
return
if "latitude" not in wells.columns or "longitude" not in wells.columns:
logger.warning("Skipping border metric recompute: latitude/longitude columns are missing.")
return
try:
import geopandas as gpd
except ImportError:
logger.warning("Skipping border metric recompute: geopandas is not installed.")
return
shp_path = self._resolve_texmex_shapefile()
if shp_path is None:
logger.warning("Skipping border metric recompute: Texas-Mexico shapefile was not found.")
return
df = wells.copy()
df["latitude"] = pd.to_numeric(df["latitude"], errors="coerce")
df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce")
valid_mask = df["latitude"].notna() & df["longitude"].notna()
if not valid_mask.any():
logger.warning("Skipping border metric recompute: no valid latitude/longitude rows found.")
return
texmex = gpd.read_file(shp_path)
if texmex.empty:
logger.warning("Skipping border metric recompute: Texas-Mexico shapefile is empty.")
return
# Prefer rows explicitly tied to Texas boundary segments if metadata exists.
if "NAME" in texmex.columns:
texas_border = texmex[texmex["NAME"].astype(str).str.contains("Texas", case=False, na=False)].copy()
if texas_border.empty:
texas_border = texmex
else:
texas_border = texmex
# Work in a projected CRS so geometric distances are in meters.
if texas_border.crs is None:
texas_border = texas_border.set_crs("EPSG:4326", allow_override=True)
texas_border = texas_border.to_crs(self.BORDER_DISTANCE_CRS)
border_union = texas_border.geometry.union_all()
points = gpd.GeoDataFrame(
df.loc[valid_mask, [self.ID_COLUMN, "longitude", "latitude"]].copy(),
geometry=gpd.points_from_xy(
df.loc[valid_mask, "longitude"],
df.loc[valid_mask, "latitude"],
),
crs="EPSG:4326",
).to_crs(self.BORDER_DISTANCE_CRS)
distances_m = points.geometry.distance(border_union)
distances_km = distances_m / 1000.0
points["distance_to_texmex_km"] = distances_km
points["within_25km_texmex"] = (distances_km <= 25.0).astype(int)
points["within_50km_texmex"] = (distances_km <= 50.0).astype(int)
points["texmex_name"] = points["within_50km_texmex"].map(
{1: "Texas-Mexico Border (computed)", 0: pd.NA}
)
df["distance_to_texmex_km"] = pd.NA
df["within_25km_texmex"] = 0
df["within_50km_texmex"] = 0
df["texmex_name"] = pd.NA
df = df.merge(
points[
[
self.ID_COLUMN,
"distance_to_texmex_km",
"within_25km_texmex",
"within_50km_texmex",
"texmex_name",
]
],
on=self.ID_COLUMN,
how="left",
suffixes=("", "_computed"),
)
# Prefer computed values where available.
for col in ["distance_to_texmex_km", "within_25km_texmex", "within_50km_texmex", "texmex_name"]:
computed_col = f"{col}_computed"
if computed_col in df.columns:
df[col] = df[computed_col].combine_first(df[col])
df = df.drop(columns=[computed_col])
df["within_25km_texmex"] = pd.to_numeric(df["within_25km_texmex"], errors="coerce").fillna(0).astype(int)
df["within_50km_texmex"] = pd.to_numeric(df["within_50km_texmex"], errors="coerce").fillna(0).astype(int)
df["distance_to_texmex_km"] = pd.to_numeric(df["distance_to_texmex_km"], errors="coerce")
self.data["well_data"] = df
logger.info(
"Recomputed border metrics for %s wells (%s within 25km, %s within 50km)",
len(df),
int(df["within_25km_texmex"].sum()),
int(df["within_50km_texmex"].sum()),
)
def _load_inspections(self) -> pd.DataFrame:
table = self.config.inspections_source
columns = self._get_columns(table)
alias = "i"
select_parts = []
base_candidates = [
"id",
"district",
"county",
"inspection_date",
"inspection_type",
"operator_name",
"field_name",
"compliance",
"file_date",
"created_at",
]
for column in base_candidates:
picked = self._pick_column(columns, [column])
if picked:
select_parts.append(f'{alias}."{picked}" AS {column}')
api_norm_col = self._pick_column(columns, ["api_norm"])
if not api_norm_col:
raise DataLoadError(f"{table} does not expose api_norm")
select_parts.append(f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}')
where_clause = f'WHERE {alias}."{api_norm_col}" IS NOT NULL'
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias} {where_clause}'
df = self._execute_query(query)
for col in ["inspection_date", "file_date", "created_at"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
df = df[df[self.ID_COLUMN].notna()].copy()
df = df[df[self.ID_COLUMN].notna()].copy()
if "inspection_date" in df.columns:
df = df.sort_values([self.ID_COLUMN, "inspection_date"])
df["days_since_last_inspection"] = (
df.groupby(self.ID_COLUMN)["inspection_date"].diff().dt.days
)
logger.info("Loaded %s inspections from %s", len(df), table)
return df.reset_index(drop=True)
def _load_violations(self) -> pd.DataFrame:
table = self.config.violations_source
columns = self._get_columns(table)
alias = "v"
select_parts = []
row_id_candidates = ["id", "violation_id", "violationid", "objectid", "row_id"]
base_candidates = [
"operator_name",
"p5_operator_no",
"district",
"oil_lease_gas_well_id",
"lease_fac_name",
"well_no",
"drilling_permit_no",
"field_name",
"violated_rule",
"violated_rule_desc",
"major_viol_ind",
"compliant_on_reinsp",
"last_enf_action",
"last_enf_action_date",
"violation_disc_date",
"file_date",
"created_at",
]
for row_id in row_id_candidates:
picked = self._pick_column(columns, [row_id])
if picked:
select_parts.append(f'{alias}."{picked}" AS {row_id}')
break
for column in base_candidates:
picked = self._pick_column(columns, [column])
if picked:
select_parts.append(f'{alias}."{picked}" AS {column}')
api_norm_col = self._pick_column(columns, ["api_norm"])
if not api_norm_col:
raise DataLoadError(f"{table} does not expose api_norm")
select_parts.append(f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}')
where_clause = f'WHERE {alias}."{api_norm_col}" IS NOT NULL'
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias} {where_clause}'
df = self._execute_query(query)
for col in ["violation_disc_date", "last_enf_action_date", "file_date", "created_at"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
df = df[df[self.ID_COLUMN].notna()].reset_index(drop=True)
df = df[df[self.ID_COLUMN].notna()].reset_index(drop=True)
if not df.empty:
row_id_col = next(
(c for c in ["id", "violation_id", "violationid", "objectid", "row_id"] if c in df.columns),
None,
)
if row_id_col is None:
df["violation_row_id"] = range(1, len(df) + 1)
else:
df = df.rename(columns={row_id_col: "violation_row_id"})
df["total_violations"] = df.groupby(self.ID_COLUMN)["violation_row_id"].transform("count")
df["violation_number"] = df.groupby(self.ID_COLUMN).cumcount() + 1
logger.info("Loaded %s violations from %s", len(df), table)
return df
def _create_performance_metrics(self) -> None:
insp = self.data.get("inspections", pd.DataFrame()).copy()
viol = self.data.get("violations", pd.DataFrame()).copy()
if insp.empty and viol.empty:
return
if not insp.empty:
count_col = "id" if "id" in insp.columns else "inspection_date"
insp_metrics = insp.groupby(self.ID_COLUMN).agg(
total_inspections=(count_col, "count")
)
if "compliance" in insp.columns:
insp_metrics["compliance_rate"] = (
insp.groupby(self.ID_COLUMN)["compliance"]
.apply(lambda x: (x == "Yes").mean() * 100)
)
if "days_since_last_inspection" in insp.columns:
insp_metrics["avg_days_between_inspections"] = (
insp.groupby(self.ID_COLUMN)["days_since_last_inspection"].mean()
)
else:
insp_metrics = pd.DataFrame()
if not viol.empty:
count_col = (
"violation_row_id"
if "violation_row_id" in viol.columns
else ("id" if "id" in viol.columns else self.ID_COLUMN)
)
viol_metrics = viol.groupby(self.ID_COLUMN).agg(
total_violations=(count_col, "count"),
)
if "major_viol_ind" in viol.columns:
viol_metrics["major_violations"] = (
viol.groupby(self.ID_COLUMN)["major_viol_ind"]
.apply(lambda x: (x == "Y").sum())
)
if "compliant_on_reinsp" in viol.columns:
viol_metrics["reinspection_compliance_rate"] = (
viol.groupby(self.ID_COLUMN)["compliant_on_reinsp"]
.apply(lambda x: (x == "Y").mean() * 100)
)
else:
viol_metrics = pd.DataFrame()
metrics = insp_metrics.join(viol_metrics, how="outer").fillna(0)
metrics = metrics.reset_index()
self.data["performance_metrics"] = metrics
# --------------------------------------------------------------------------------------
# Analytic helpers
# --------------------------------------------------------------------------------------
def analyze_inspection_patterns(self) -> Dict[str, Any]:
insp_df = self.data.get("inspections", pd.DataFrame())
if insp_df.empty:
return {}
result: Dict[str, Any] = {
"overall_statistics": {
"total_inspections": int(len(insp_df)),
"unique_wells_inspected": int(insp_df[self.ID_COLUMN].nunique()),
}
}
if "compliance" in insp_df.columns:
result["overall_statistics"]["overall_compliance_rate"] = (
(insp_df["compliance"] == "Yes").mean() * 100
)
if "days_since_last_inspection" in insp_df.columns:
result["overall_statistics"]["avg_days_between_inspections"] = insp_df[
"days_since_last_inspection"
].mean()
result["overall_statistics"]["median_days_between_inspections"] = insp_df[
"days_since_last_inspection"
].median()
if "inspection_date" in insp_df.columns:
insp_df["inspection_date"] = pd.to_datetime(insp_df["inspection_date"], errors="coerce")
insp_df["year"] = insp_df["inspection_date"].dt.year
result["temporal_patterns"] = {
"inspections_by_year": insp_df.groupby("year").size().dropna().to_dict()
}
if "compliance" in insp_df.columns:
result["temporal_patterns"]["compliance_by_year"] = (
insp_df.groupby("year")["compliance"]
.apply(lambda x: (x == "Yes").mean() * 100)
.dropna()
.to_dict()
)
if "district" in insp_df.columns:
district_counts = insp_df.groupby("district").size().to_dict()
district_compliance = {}
if "compliance" in insp_df.columns:
district_compliance = (
insp_df.groupby("district")["compliance"]
.apply(lambda x: (x == "Yes").mean() * 100)
.to_dict()
)
result["district_performance"] = {
"inspections_by_district": district_counts,
"compliance_by_district": district_compliance,
}
return result
def analyze_violations(self) -> Dict[str, Any]:
viol_df = self.data.get("violations", pd.DataFrame())
if viol_df.empty:
return {}
result: Dict[str, Any] = {
"overall_statistics": {
"total_violations": int(len(viol_df)),
"unique_wells_with_violations": int(viol_df[self.ID_COLUMN].nunique()),
},
"violation_types": {},
"enforcement_effectiveness": {},
}
if "major_viol_ind" in viol_df.columns:
result["overall_statistics"]["major_violations"] = int(
(viol_df["major_viol_ind"] == "Y").sum()
)
if "compliant_on_reinsp" in viol_df.columns:
result["overall_statistics"]["compliance_on_reinspection_rate"] = (
(viol_df["compliant_on_reinsp"] == "Y").mean() * 100
)
if "violated_rule" in viol_df.columns:
result["violation_types"]["common_violations"] = (
viol_df["violated_rule"].value_counts().head(10).to_dict()
)
if "major_viol_ind" in viol_df.columns:
result["violation_types"]["major_violation_types"] = (
viol_df[viol_df["major_viol_ind"] == "Y"]["violated_rule"]
.value_counts()
.head(5)
.to_dict()
)
if {"violated_rule", "compliant_on_reinsp"} <= set(viol_df.columns):
result["enforcement_effectiveness"]["resolution_rate_by_type"] = (
viol_df.groupby("violated_rule")["compliant_on_reinsp"]
.apply(lambda x: (x == "Y").mean() * 100)
.to_dict()
)
return result
def analyze_regulatory_chain(self) -> Dict[str, Any]:
insp_df = self.data.get("inspections", pd.DataFrame()).copy()
viol_df = self.data.get("violations", pd.DataFrame()).copy()
if insp_df.empty or viol_df.empty:
return {}
if "inspection_date" not in insp_df.columns or "violation_disc_date" not in viol_df.columns:
return {}
insp_df["inspection_date"] = pd.to_datetime(insp_df["inspection_date"], errors="coerce")
viol_df["violation_disc_date"] = pd.to_datetime(viol_df["violation_disc_date"], errors="coerce")
viol_df["last_enf_action_date"] = pd.to_datetime(
viol_df.get("last_enf_action_date"), errors="coerce"
)
insp_df = insp_df.dropna(subset=[self.ID_COLUMN, "inspection_date"])
viol_df = viol_df.dropna(subset=[self.ID_COLUMN, "violation_disc_date"])
if insp_df.empty or viol_df.empty:
return {}
insp_sorted = (
insp_df.sort_values(["inspection_date", self.ID_COLUMN])
.reset_index(drop=True)
)
viol_sorted = (
viol_df.sort_values(["violation_disc_date", self.ID_COLUMN])
.reset_index(drop=True)
)
matched_df = pd.merge_asof(
viol_sorted,
insp_sorted,
left_on="violation_disc_date",
right_on="inspection_date",
by=self.ID_COLUMN,
direction="backward",
suffixes=("_viol", "_insp"),
).dropna(subset=["inspection_date"])
if matched_df.empty:
return {}
total_inspections = len(insp_df)
inspection_id_col = "id_insp" if "id_insp" in matched_df.columns else "inspection_date"
inspections_with_violations = matched_df[inspection_id_col].nunique()
violation_rate = (inspections_with_violations / total_inspections) * 100 if total_inspections else 0
time_spans = matched_df.dropna(
subset=["inspection_date", "violation_disc_date", "last_enf_action_date"]
).copy()
time_spans["insp_to_viol"] = (
time_spans["violation_disc_date"] - time_spans["inspection_date"]
).dt.days
time_spans["viol_to_enforce"] = (
time_spans["last_enf_action_date"] - time_spans["violation_disc_date"]
).dt.days
time_spans["total_span"] = time_spans["insp_to_viol"] + time_spans["viol_to_enforce"]
enforcement_patterns = {
"action_types": viol_df["last_enf_action"].value_counts().to_dict()
if "last_enf_action" in viol_df.columns
else {},
"enforcement_rate": (
matched_df["last_enf_action"].notna().sum() / len(matched_df) * 100
),
}
if not time_spans.empty:
enforcement_patterns["avg_days_to_enforcement"] = time_spans["viol_to_enforce"].mean()
return {
"summary": {
"total_inspections": total_inspections,
"violation_rate": violation_rate,
"unique_wells_inspected": int(insp_df[self.ID_COLUMN].nunique()),
},
"conversion_funnel": {
"inspections_with_violations": inspections_with_violations,
"violations_with_enforcement": int(matched_df["last_enf_action"].notna().sum()),
},
"enforcement_patterns": enforcement_patterns,
"time_spans": time_spans.describe().to_dict() if not time_spans.empty else {},
}
def analyze_environmental_justice(self) -> Dict[str, Any]:
well_df = self.data.get("well_data", pd.DataFrame()).copy()
if well_df.empty or "census_tract_geoid" not in well_df.columns:
return {}
metrics = self.data.get("performance_metrics")
if metrics is not None and not metrics.empty:
well_df = well_df.merge(metrics, on=self.ID_COLUMN, how="left")
tract_df = well_df.dropna(subset=["census_tract_geoid"]).copy()
if tract_df.empty:
return {}
agg_map: Dict[str, str] = {self.ID_COLUMN: "count"}
mean_cols = [
"ej_composite_score",
"pct_minority",
"pct_hispanic",
"poverty_rate",
"median_household_income",
"avg_days_between_inspections",
"reinspection_compliance_rate",
"compliance_rate",
]
for col in mean_cols:
if col in tract_df.columns:
agg_map[col] = "mean"
if "total_inspections" in tract_df.columns:
agg_map["total_inspections"] = "mean"
if "total_violations" in tract_df.columns:
agg_map["total_violations"] = "mean"
if "major_violations" in tract_df.columns:
agg_map["major_violations"] = "sum"
tract_summary = (
tract_df.groupby("census_tract_geoid")
.agg(agg_map)
.rename(columns={self.ID_COLUMN: "wells_in_tract"})
.reset_index()
)
rename_map = {
"total_inspections": "avg_inspections",
"total_violations": "avg_violations",
"compliance_rate": "avg_compliance_rate",
}
tract_summary = tract_summary.rename(columns=rename_map)
demographic_vars = [
col
for col in [
"ej_composite_score",
"pct_minority",
"pct_hispanic",
"poverty_rate",
"median_household_income",
]
if col in tract_summary.columns
]
performance_vars = [
col
for col in [
"avg_inspections",
"avg_violations",
"major_violations",
"avg_compliance_rate",
"avg_days_between_inspections",
"reinspection_compliance_rate",
"wells_in_tract",
]
if col in tract_summary.columns
]
correlations: Dict[str, Dict[str, float]] = {dv: {} for dv in demographic_vars}
for d in demographic_vars:
for p in performance_vars:
correlations[d][p] = tract_summary[d].corr(tract_summary[p], method="spearman")
def split_high_low(column: str) -> Dict[str, Dict[str, float]]:
result: Dict[str, Dict[str, float]] = {}
if column not in tract_summary.columns:
return result
median_value = tract_summary[column].median()
high = tract_summary[tract_summary[column] > median_value]
low = tract_summary[tract_summary[column] <= median_value]
for metric in performance_vars:
result[metric] = {
"high": high[metric].mean() if not high.empty else float("nan"),
"low": low[metric].mean() if not low.empty else float("nan"),
}
return result
return {
"summary": {
"total_tracts": int(len(tract_summary)),
"total_wells": int(tract_summary["wells_in_tract"].sum()),
"avg_wells_per_tract": tract_summary["wells_in_tract"].mean(),
"avg_ej_score": tract_summary.get("ej_composite_score", pd.Series(dtype=float)).mean(),
},
"correlations": correlations,
"high_vulnerability_vs_low": split_high_low("ej_composite_score"),
"high_poverty_vs_low": split_high_low("poverty_rate"),
}
# --------------------------------------------------------------------------------------
# Public helpers
# --------------------------------------------------------------------------------------
def get_analysis(self) -> Dict[str, Any]:
return {
"inspection_analysis": self.analyze_inspection_patterns(),
"violation_analysis": self.analyze_violations(),
"regulatory_chain": self.analyze_regulatory_chain(),
"environmental_justice": self.analyze_environmental_justice(),
}
@staticmethod
def _format_stat(stat_value: Any) -> str:
if stat_value is None:
return "n/a"
if isinstance(stat_value, (int, float)):
if pd.isna(stat_value):
return "n/a"
return f"{stat_value:,.2f}"
return str(stat_value)
def print_analysis(self) -> None:
analysis = self.get_analysis()
def print_block(title: str, stats: Dict[str, Any]) -> None:
if not stats:
return
print(f"\n{title}")
for stat_key, stat_value in stats.items():
print(f" {stat_key.replace('_', ' ').title()}: {self._format_stat(stat_value)}")
print_block("INSPECTION ANALYSIS", analysis.get("inspection_analysis", {}).get("overall_statistics", {}))
print_block("VIOLATION ANALYSIS", analysis.get("violation_analysis", {}).get("overall_statistics", {}))
print_block("REGULATORY CHAIN", analysis.get("regulatory_chain", {}).get("summary", {}))
print_block("ENVIRONMENTAL JUSTICE", analysis.get("environmental_justice", {}).get("summary", {}))
violation_types = analysis.get("violation_analysis", {}).get("violation_types", {})
if violation_types:
print("\nTop Violations:")
for rule, count in violation_types.get("common_violations", {}).items():
print(f" {rule}: {self._format_stat(count)}")
def get_summary_stats(self) -> Dict[str, Any]:
stats: Dict[str, Any] = {}
wells = self.data.get("well_data", pd.DataFrame())
if not wells.empty:
stats["total_wells"] = len(wells)
stats["unique_census_tracts"] = wells["census_tract_geoid"].nunique(
dropna=True
) if "census_tract_geoid" in wells.columns else None
insp = self.data.get("inspections", pd.DataFrame())
if not insp.empty:
stats["total_inspections"] = len(insp)
viol = self.data.get("violations", pd.DataFrame())
if not viol.empty:
stats["total_violations"] = len(viol)
return stats
def export_analysis(self, path: Path | str) -> None:
output_path = Path(path)
output_path.parent.mkdir(parents=True, exist_ok=True)
analysis = self.get_analysis()
output_path.write_text(json.dumps(analysis, indent=2, default=str), encoding="utf-8")
logger.info("Analysis exported to %s", output_path)
if __name__ == "__main__":
try:
analyzer = WellAnalyzer()
print("Summary Stats:")
for key, value in analyzer.get_summary_stats().items():
print(f" {key.replace('_', ' ').title()}: {value}")
analyzer.print_analysis()
analyzer.export_analysis(Path("analysis_output.json"))
except WellAnalyzerError as exc:
logger.error("Well Analyzer failed: %s", exc, exc_info=True)