782 lines
31 KiB
Python
782 lines
31 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import warnings
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
from urllib.parse import quote_plus
|
|
|
|
import pandas as pd
|
|
from dotenv import load_dotenv
|
|
from sqlalchemy import Engine, create_engine, text
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
# Configure logging early so that downstream modules inherit the settings.
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Pandas throws a lot of noisy warnings when casting timestamps out of Postgres.
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
|
|
|
|
class WellAnalyzerError(Exception):
|
|
"""Base exception class for all analyzer failures."""
|
|
|
|
|
|
class ConfigError(WellAnalyzerError):
|
|
"""Raised when we cannot build a working configuration."""
|
|
|
|
|
|
class DataLoadError(WellAnalyzerError):
|
|
"""Raised when data cannot be loaded from the warehouse."""
|
|
|
|
|
|
class AnalysisError(WellAnalyzerError):
|
|
"""Raised when a downstream analytic task fails."""
|
|
|
|
|
|
@dataclass
|
|
class Config:
|
|
"""Runtime configuration for the analyzer."""
|
|
|
|
engine: Engine
|
|
chunk_size: int = 10_000
|
|
cache_dir: Path = Path("./cache")
|
|
well_source: str = ""
|
|
inspections_source: str = ""
|
|
violations_source: str = ""
|
|
|
|
|
|
def _load_engine_from_env() -> Engine:
|
|
"""Build a SQLAlchemy engine using PG* environment variables."""
|
|
|
|
load_dotenv(override=False)
|
|
|
|
host = os.getenv("PGHOST", "localhost")
|
|
port = os.getenv("PGPORT", "5432")
|
|
user = os.getenv("PGUSER", "postgres")
|
|
password = quote_plus(os.getenv("PGPASSWORD", ""))
|
|
database = os.getenv("PGDATABASE", "texas_data")
|
|
|
|
url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
|
|
logger.info("Connecting to Postgres", extra={"host": host, "database": database})
|
|
try:
|
|
return create_engine(url)
|
|
except SQLAlchemyError as exc:
|
|
raise ConfigError(f"Failed to create engine for {url}: {exc}") from exc
|
|
|
|
|
|
class WellAnalyzer:
|
|
"""
|
|
Analyze wells, inspections, violations, and environmental-justice indicators held in the
|
|
rebuilt PostGIS warehouse. The class auto-detects the rebuilt tables that now exist
|
|
in the texas-rebuild-postgis project so it works against both fresh rebuilds and future
|
|
refreshes without hand-editing SQL strings.
|
|
"""
|
|
|
|
ID_COLUMN = "api_norm"
|
|
WELL_SOURCE_CANDIDATES = [
|
|
"well_shape_tract",
|
|
]
|
|
|
|
WELL_COLUMN_MAP: Dict[str, List[str]] = {
|
|
"census_tract_geoid": ["census_tract_geoid", "geoid", "tract_geoid"],
|
|
"tract_name": ["tract_name", "name"],
|
|
"ruca_category": ["ruca_category"],
|
|
"ruca_code_2020": ["ruca_code_2020"],
|
|
"ruca_primary_description": ["ruca_primary_description"],
|
|
"ruca_secondary_description": ["ruca_secondary_description"],
|
|
"ej_composite_score": ["ej_composite_score"],
|
|
"pct_minority": ["pct_minority"],
|
|
"pct_hispanic": ["pct_hispanic"],
|
|
"poverty_rate": ["poverty_rate"],
|
|
"unemployment_rate": ["unemployment_rate"],
|
|
"less_than_hs_pct": ["less_than_hs_pct"],
|
|
"linguistic_isolation_rate": ["linguistic_isolation_rate"],
|
|
"renter_cost_burden_rate": ["renter_cost_burden_rate"],
|
|
"disability_rate": ["disability_rate"],
|
|
"pct_under5": ["pct_under5"],
|
|
"pct_65plus": ["pct_65plus"],
|
|
"median_household_income": ["median_household_income"],
|
|
"latitude": ["latitude", "lat"],
|
|
"longitude": ["longitude", "lon", "lng"],
|
|
"basin_label": ["basin_label", "basin_name"],
|
|
"play_label": ["play_label", "play_name"],
|
|
"texmex_name": ["texmex_name"],
|
|
"distance_to_texmex_km": ["distance_to_texmex_km"],
|
|
"within_25km_texmex": ["within_25km_texmex"],
|
|
"within_50km_texmex": ["within_50km_texmex"],
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
engine: Optional[Engine] = None,
|
|
*,
|
|
chunk_size: int = 10_000,
|
|
cache_dir: Optional[Path] = None,
|
|
well_source: Optional[str] = None,
|
|
) -> None:
|
|
if engine is None:
|
|
engine = _load_engine_from_env()
|
|
|
|
self.config = Config(
|
|
engine=engine,
|
|
chunk_size=chunk_size,
|
|
cache_dir=(cache_dir or Path("./cache")),
|
|
)
|
|
|
|
self.config.well_source = well_source or self._detect_table(self.WELL_SOURCE_CANDIDATES)
|
|
if not self.config.well_source:
|
|
raise ConfigError(
|
|
"Could not find a well table/view. "
|
|
"Expected one of well_enriched_all_plus, well_enriched_all, well_with_demographics_table, well_shape_tract."
|
|
)
|
|
|
|
self.config.inspections_source = self._detect_table(["inspections"])
|
|
self.config.violations_source = self._detect_table(["violations"])
|
|
if not self.config.inspections_source or not self.config.violations_source:
|
|
raise ConfigError("Both inspections and violations tables must exist in the database.")
|
|
|
|
self.data: Dict[str, pd.DataFrame] = {}
|
|
self._initialize_data()
|
|
|
|
# --------------------------------------------------------------------------------------
|
|
# Helper methods for metadata detection and SQL building
|
|
# --------------------------------------------------------------------------------------
|
|
|
|
def _detect_table(self, candidates: List[str]) -> Optional[str]:
|
|
for candidate in candidates:
|
|
found = self._table_exists(candidate)
|
|
if found:
|
|
return found
|
|
return None
|
|
|
|
def _table_exists(self, table: str) -> Optional[str]:
|
|
names_to_try = []
|
|
if "." in table:
|
|
names_to_try.append(table)
|
|
else:
|
|
names_to_try.append(f"public.{table}")
|
|
names_to_try.append(table)
|
|
|
|
query = text("SELECT to_regclass(:name) IS NOT NULL AS exists")
|
|
with self.config.engine.begin() as conn:
|
|
for name in names_to_try:
|
|
exists = conn.execute(query, {"name": name}).scalar()
|
|
if exists:
|
|
return name
|
|
return None
|
|
|
|
def _split_table_name(self, qualified: str) -> Dict[str, Optional[str]]:
|
|
if "." in qualified:
|
|
schema, table = qualified.split(".", 1)
|
|
return {"schema": schema, "table": table}
|
|
return {"schema": None, "table": qualified}
|
|
|
|
def _get_columns(self, table: str) -> Dict[str, str]:
|
|
pieces = self._split_table_name(table)
|
|
sql = [
|
|
"SELECT column_name",
|
|
"FROM information_schema.columns",
|
|
"WHERE table_name = :table",
|
|
]
|
|
params = {"table": pieces["table"]}
|
|
if pieces["schema"]:
|
|
sql.append("AND table_schema = :schema")
|
|
params["schema"] = pieces["schema"]
|
|
sql.append("ORDER BY ordinal_position")
|
|
with self.config.engine.begin() as conn:
|
|
rows = conn.execute(text(" ".join(sql)), params).fetchall()
|
|
return {row[0].lower(): row[0] for row in rows}
|
|
|
|
def _pick_column(self, columns: Dict[str, str], names: List[str]) -> Optional[str]:
|
|
for name in names:
|
|
if name.lower() in columns:
|
|
return columns[name.lower()]
|
|
return None
|
|
|
|
def _execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
|
|
try:
|
|
df_chunks: List[pd.DataFrame] = []
|
|
for chunk in pd.read_sql(
|
|
text(query),
|
|
self.config.engine,
|
|
params=params,
|
|
chunksize=self.config.chunk_size,
|
|
):
|
|
df_chunks.append(chunk)
|
|
return pd.concat(df_chunks, ignore_index=True) if df_chunks else pd.DataFrame()
|
|
except SQLAlchemyError as exc:
|
|
logger.error("Query failed: %s", query, exc_info=True)
|
|
raise DataLoadError(f"Failed executing query: {exc}") from exc
|
|
|
|
# --------------------------------------------------------------------------------------
|
|
# Data loading
|
|
# --------------------------------------------------------------------------------------
|
|
|
|
def _initialize_data(self) -> None:
|
|
self.data["well_data"] = self._load_wells()
|
|
self.data["inspections"] = self._load_inspections()
|
|
self.data["violations"] = self._load_violations()
|
|
if not self.data["well_data"].empty and (
|
|
not self.data["inspections"].empty or not self.data["violations"].empty
|
|
):
|
|
self._create_performance_metrics()
|
|
|
|
def _load_wells(self) -> pd.DataFrame:
|
|
table = self.config.well_source
|
|
columns = self._get_columns(table)
|
|
alias = "w"
|
|
api_norm_col = self._pick_column(columns, ["api_norm"])
|
|
if not api_norm_col:
|
|
raise DataLoadError(f"{table} does not expose api_norm")
|
|
select_parts = [f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}']
|
|
|
|
for target, candidates in self.WELL_COLUMN_MAP.items():
|
|
column = self._pick_column(columns, candidates)
|
|
if column:
|
|
select_parts.append(f'{alias}."{column}" AS {target}')
|
|
|
|
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias}'
|
|
df = self._execute_query(query)
|
|
|
|
df[self.ID_COLUMN] = df[self.ID_COLUMN].astype(str).str.strip()
|
|
df = df[df[self.ID_COLUMN].notna()]
|
|
df = df.drop_duplicates(subset=[self.ID_COLUMN]).reset_index(drop=True)
|
|
logger.info("Loaded %s wells from %s", len(df), table)
|
|
return df
|
|
|
|
def _load_inspections(self) -> pd.DataFrame:
|
|
table = self.config.inspections_source
|
|
columns = self._get_columns(table)
|
|
alias = "i"
|
|
select_parts = []
|
|
|
|
base_candidates = [
|
|
"id",
|
|
"district",
|
|
"county",
|
|
"inspection_date",
|
|
"inspection_type",
|
|
"operator_name",
|
|
"field_name",
|
|
"compliance",
|
|
"file_date",
|
|
"created_at",
|
|
]
|
|
|
|
for column in base_candidates:
|
|
picked = self._pick_column(columns, [column])
|
|
if picked:
|
|
select_parts.append(f'{alias}."{picked}" AS {column}')
|
|
|
|
api_norm_col = self._pick_column(columns, ["api_norm"])
|
|
if not api_norm_col:
|
|
raise DataLoadError(f"{table} does not expose api_norm")
|
|
select_parts.append(f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}')
|
|
where_clause = f'WHERE {alias}."{api_norm_col}" IS NOT NULL'
|
|
|
|
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias} {where_clause}'
|
|
df = self._execute_query(query)
|
|
|
|
for col in ["inspection_date", "file_date", "created_at"]:
|
|
if col in df.columns:
|
|
df[col] = pd.to_datetime(df[col], errors="coerce")
|
|
|
|
df = df[df[self.ID_COLUMN].notna()].copy()
|
|
df = df[df[self.ID_COLUMN].notna()].copy()
|
|
if "inspection_date" in df.columns:
|
|
df = df.sort_values([self.ID_COLUMN, "inspection_date"])
|
|
df["days_since_last_inspection"] = (
|
|
df.groupby(self.ID_COLUMN)["inspection_date"].diff().dt.days
|
|
)
|
|
|
|
logger.info("Loaded %s inspections from %s", len(df), table)
|
|
return df.reset_index(drop=True)
|
|
|
|
def _load_violations(self) -> pd.DataFrame:
|
|
table = self.config.violations_source
|
|
columns = self._get_columns(table)
|
|
alias = "v"
|
|
select_parts = []
|
|
row_id_candidates = ["id", "violation_id", "violationid", "objectid", "row_id"]
|
|
|
|
base_candidates = [
|
|
"operator_name",
|
|
"p5_operator_no",
|
|
"district",
|
|
"oil_lease_gas_well_id",
|
|
"lease_fac_name",
|
|
"well_no",
|
|
"drilling_permit_no",
|
|
"field_name",
|
|
"violated_rule",
|
|
"violated_rule_desc",
|
|
"major_viol_ind",
|
|
"compliant_on_reinsp",
|
|
"last_enf_action",
|
|
"last_enf_action_date",
|
|
"violation_disc_date",
|
|
"file_date",
|
|
"created_at",
|
|
]
|
|
|
|
for row_id in row_id_candidates:
|
|
picked = self._pick_column(columns, [row_id])
|
|
if picked:
|
|
select_parts.append(f'{alias}."{picked}" AS {row_id}')
|
|
break
|
|
|
|
for column in base_candidates:
|
|
picked = self._pick_column(columns, [column])
|
|
if picked:
|
|
select_parts.append(f'{alias}."{picked}" AS {column}')
|
|
|
|
api_norm_col = self._pick_column(columns, ["api_norm"])
|
|
if not api_norm_col:
|
|
raise DataLoadError(f"{table} does not expose api_norm")
|
|
select_parts.append(f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}')
|
|
where_clause = f'WHERE {alias}."{api_norm_col}" IS NOT NULL'
|
|
|
|
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias} {where_clause}'
|
|
df = self._execute_query(query)
|
|
|
|
for col in ["violation_disc_date", "last_enf_action_date", "file_date", "created_at"]:
|
|
if col in df.columns:
|
|
df[col] = pd.to_datetime(df[col], errors="coerce")
|
|
|
|
df = df[df[self.ID_COLUMN].notna()].reset_index(drop=True)
|
|
df = df[df[self.ID_COLUMN].notna()].reset_index(drop=True)
|
|
|
|
if not df.empty:
|
|
row_id_col = next(
|
|
(c for c in ["id", "violation_id", "violationid", "objectid", "row_id"] if c in df.columns),
|
|
None,
|
|
)
|
|
if row_id_col is None:
|
|
df["violation_row_id"] = range(1, len(df) + 1)
|
|
else:
|
|
df = df.rename(columns={row_id_col: "violation_row_id"})
|
|
df["total_violations"] = df.groupby(self.ID_COLUMN)["violation_row_id"].transform("count")
|
|
df["violation_number"] = df.groupby(self.ID_COLUMN).cumcount() + 1
|
|
|
|
logger.info("Loaded %s violations from %s", len(df), table)
|
|
return df
|
|
|
|
def _create_performance_metrics(self) -> None:
|
|
insp = self.data.get("inspections", pd.DataFrame()).copy()
|
|
viol = self.data.get("violations", pd.DataFrame()).copy()
|
|
if insp.empty and viol.empty:
|
|
return
|
|
|
|
if not insp.empty:
|
|
count_col = "id" if "id" in insp.columns else "inspection_date"
|
|
insp_metrics = insp.groupby(self.ID_COLUMN).agg(
|
|
total_inspections=(count_col, "count")
|
|
)
|
|
if "compliance" in insp.columns:
|
|
insp_metrics["compliance_rate"] = (
|
|
insp.groupby(self.ID_COLUMN)["compliance"]
|
|
.apply(lambda x: (x == "Yes").mean() * 100)
|
|
)
|
|
if "days_since_last_inspection" in insp.columns:
|
|
insp_metrics["avg_days_between_inspections"] = (
|
|
insp.groupby(self.ID_COLUMN)["days_since_last_inspection"].mean()
|
|
)
|
|
else:
|
|
insp_metrics = pd.DataFrame()
|
|
|
|
if not viol.empty:
|
|
count_col = (
|
|
"violation_row_id"
|
|
if "violation_row_id" in viol.columns
|
|
else ("id" if "id" in viol.columns else self.ID_COLUMN)
|
|
)
|
|
viol_metrics = viol.groupby(self.ID_COLUMN).agg(
|
|
total_violations=(count_col, "count"),
|
|
)
|
|
if "major_viol_ind" in viol.columns:
|
|
viol_metrics["major_violations"] = (
|
|
viol.groupby(self.ID_COLUMN)["major_viol_ind"]
|
|
.apply(lambda x: (x == "Y").sum())
|
|
)
|
|
if "compliant_on_reinsp" in viol.columns:
|
|
viol_metrics["reinspection_compliance_rate"] = (
|
|
viol.groupby(self.ID_COLUMN)["compliant_on_reinsp"]
|
|
.apply(lambda x: (x == "Y").mean() * 100)
|
|
)
|
|
else:
|
|
viol_metrics = pd.DataFrame()
|
|
|
|
metrics = insp_metrics.join(viol_metrics, how="outer").fillna(0)
|
|
metrics = metrics.reset_index()
|
|
self.data["performance_metrics"] = metrics
|
|
|
|
# --------------------------------------------------------------------------------------
|
|
# Analytic helpers
|
|
# --------------------------------------------------------------------------------------
|
|
|
|
def analyze_inspection_patterns(self) -> Dict[str, Any]:
|
|
insp_df = self.data.get("inspections", pd.DataFrame())
|
|
if insp_df.empty:
|
|
return {}
|
|
|
|
result: Dict[str, Any] = {
|
|
"overall_statistics": {
|
|
"total_inspections": int(len(insp_df)),
|
|
"unique_wells_inspected": int(insp_df[self.ID_COLUMN].nunique()),
|
|
}
|
|
}
|
|
|
|
if "compliance" in insp_df.columns:
|
|
result["overall_statistics"]["overall_compliance_rate"] = (
|
|
(insp_df["compliance"] == "Yes").mean() * 100
|
|
)
|
|
if "days_since_last_inspection" in insp_df.columns:
|
|
result["overall_statistics"]["avg_days_between_inspections"] = insp_df[
|
|
"days_since_last_inspection"
|
|
].mean()
|
|
result["overall_statistics"]["median_days_between_inspections"] = insp_df[
|
|
"days_since_last_inspection"
|
|
].median()
|
|
|
|
if "inspection_date" in insp_df.columns:
|
|
insp_df["inspection_date"] = pd.to_datetime(insp_df["inspection_date"], errors="coerce")
|
|
insp_df["year"] = insp_df["inspection_date"].dt.year
|
|
result["temporal_patterns"] = {
|
|
"inspections_by_year": insp_df.groupby("year").size().dropna().to_dict()
|
|
}
|
|
if "compliance" in insp_df.columns:
|
|
result["temporal_patterns"]["compliance_by_year"] = (
|
|
insp_df.groupby("year")["compliance"]
|
|
.apply(lambda x: (x == "Yes").mean() * 100)
|
|
.dropna()
|
|
.to_dict()
|
|
)
|
|
|
|
if "district" in insp_df.columns:
|
|
district_counts = insp_df.groupby("district").size().to_dict()
|
|
district_compliance = {}
|
|
if "compliance" in insp_df.columns:
|
|
district_compliance = (
|
|
insp_df.groupby("district")["compliance"]
|
|
.apply(lambda x: (x == "Yes").mean() * 100)
|
|
.to_dict()
|
|
)
|
|
result["district_performance"] = {
|
|
"inspections_by_district": district_counts,
|
|
"compliance_by_district": district_compliance,
|
|
}
|
|
|
|
return result
|
|
|
|
def analyze_violations(self) -> Dict[str, Any]:
|
|
viol_df = self.data.get("violations", pd.DataFrame())
|
|
if viol_df.empty:
|
|
return {}
|
|
|
|
result: Dict[str, Any] = {
|
|
"overall_statistics": {
|
|
"total_violations": int(len(viol_df)),
|
|
"unique_wells_with_violations": int(viol_df[self.ID_COLUMN].nunique()),
|
|
},
|
|
"violation_types": {},
|
|
"enforcement_effectiveness": {},
|
|
}
|
|
|
|
if "major_viol_ind" in viol_df.columns:
|
|
result["overall_statistics"]["major_violations"] = int(
|
|
(viol_df["major_viol_ind"] == "Y").sum()
|
|
)
|
|
if "compliant_on_reinsp" in viol_df.columns:
|
|
result["overall_statistics"]["compliance_on_reinspection_rate"] = (
|
|
(viol_df["compliant_on_reinsp"] == "Y").mean() * 100
|
|
)
|
|
|
|
if "violated_rule" in viol_df.columns:
|
|
result["violation_types"]["common_violations"] = (
|
|
viol_df["violated_rule"].value_counts().head(10).to_dict()
|
|
)
|
|
if "major_viol_ind" in viol_df.columns:
|
|
result["violation_types"]["major_violation_types"] = (
|
|
viol_df[viol_df["major_viol_ind"] == "Y"]["violated_rule"]
|
|
.value_counts()
|
|
.head(5)
|
|
.to_dict()
|
|
)
|
|
|
|
if {"violated_rule", "compliant_on_reinsp"} <= set(viol_df.columns):
|
|
result["enforcement_effectiveness"]["resolution_rate_by_type"] = (
|
|
viol_df.groupby("violated_rule")["compliant_on_reinsp"]
|
|
.apply(lambda x: (x == "Y").mean() * 100)
|
|
.to_dict()
|
|
)
|
|
|
|
return result
|
|
|
|
def analyze_regulatory_chain(self) -> Dict[str, Any]:
|
|
insp_df = self.data.get("inspections", pd.DataFrame()).copy()
|
|
viol_df = self.data.get("violations", pd.DataFrame()).copy()
|
|
if insp_df.empty or viol_df.empty:
|
|
return {}
|
|
|
|
if "inspection_date" not in insp_df.columns or "violation_disc_date" not in viol_df.columns:
|
|
return {}
|
|
|
|
insp_df["inspection_date"] = pd.to_datetime(insp_df["inspection_date"], errors="coerce")
|
|
viol_df["violation_disc_date"] = pd.to_datetime(viol_df["violation_disc_date"], errors="coerce")
|
|
viol_df["last_enf_action_date"] = pd.to_datetime(
|
|
viol_df.get("last_enf_action_date"), errors="coerce"
|
|
)
|
|
|
|
insp_df = insp_df.dropna(subset=[self.ID_COLUMN, "inspection_date"])
|
|
viol_df = viol_df.dropna(subset=[self.ID_COLUMN, "violation_disc_date"])
|
|
if insp_df.empty or viol_df.empty:
|
|
return {}
|
|
|
|
insp_sorted = (
|
|
insp_df.sort_values(["inspection_date", self.ID_COLUMN])
|
|
.reset_index(drop=True)
|
|
)
|
|
viol_sorted = (
|
|
viol_df.sort_values(["violation_disc_date", self.ID_COLUMN])
|
|
.reset_index(drop=True)
|
|
)
|
|
matched_df = pd.merge_asof(
|
|
viol_sorted,
|
|
insp_sorted,
|
|
left_on="violation_disc_date",
|
|
right_on="inspection_date",
|
|
by=self.ID_COLUMN,
|
|
direction="backward",
|
|
suffixes=("_viol", "_insp"),
|
|
).dropna(subset=["inspection_date"])
|
|
|
|
if matched_df.empty:
|
|
return {}
|
|
total_inspections = len(insp_df)
|
|
inspection_id_col = "id_insp" if "id_insp" in matched_df.columns else "inspection_date"
|
|
inspections_with_violations = matched_df[inspection_id_col].nunique()
|
|
violation_rate = (inspections_with_violations / total_inspections) * 100 if total_inspections else 0
|
|
|
|
time_spans = matched_df.dropna(
|
|
subset=["inspection_date", "violation_disc_date", "last_enf_action_date"]
|
|
).copy()
|
|
time_spans["insp_to_viol"] = (
|
|
time_spans["violation_disc_date"] - time_spans["inspection_date"]
|
|
).dt.days
|
|
time_spans["viol_to_enforce"] = (
|
|
time_spans["last_enf_action_date"] - time_spans["violation_disc_date"]
|
|
).dt.days
|
|
time_spans["total_span"] = time_spans["insp_to_viol"] + time_spans["viol_to_enforce"]
|
|
|
|
enforcement_patterns = {
|
|
"action_types": viol_df["last_enf_action"].value_counts().to_dict()
|
|
if "last_enf_action" in viol_df.columns
|
|
else {},
|
|
"enforcement_rate": (
|
|
matched_df["last_enf_action"].notna().sum() / len(matched_df) * 100
|
|
),
|
|
}
|
|
if not time_spans.empty:
|
|
enforcement_patterns["avg_days_to_enforcement"] = time_spans["viol_to_enforce"].mean()
|
|
|
|
return {
|
|
"summary": {
|
|
"total_inspections": total_inspections,
|
|
"violation_rate": violation_rate,
|
|
"unique_wells_inspected": int(insp_df[self.ID_COLUMN].nunique()),
|
|
},
|
|
"conversion_funnel": {
|
|
"inspections_with_violations": inspections_with_violations,
|
|
"violations_with_enforcement": int(matched_df["last_enf_action"].notna().sum()),
|
|
},
|
|
"enforcement_patterns": enforcement_patterns,
|
|
"time_spans": time_spans.describe().to_dict() if not time_spans.empty else {},
|
|
}
|
|
|
|
def analyze_environmental_justice(self) -> Dict[str, Any]:
|
|
well_df = self.data.get("well_data", pd.DataFrame()).copy()
|
|
if well_df.empty or "census_tract_geoid" not in well_df.columns:
|
|
return {}
|
|
|
|
metrics = self.data.get("performance_metrics")
|
|
if metrics is not None and not metrics.empty:
|
|
well_df = well_df.merge(metrics, on=self.ID_COLUMN, how="left")
|
|
|
|
tract_df = well_df.dropna(subset=["census_tract_geoid"]).copy()
|
|
if tract_df.empty:
|
|
return {}
|
|
|
|
agg_map: Dict[str, str] = {self.ID_COLUMN: "count"}
|
|
mean_cols = [
|
|
"ej_composite_score",
|
|
"pct_minority",
|
|
"pct_hispanic",
|
|
"poverty_rate",
|
|
"median_household_income",
|
|
"avg_days_between_inspections",
|
|
"reinspection_compliance_rate",
|
|
"compliance_rate",
|
|
]
|
|
for col in mean_cols:
|
|
if col in tract_df.columns:
|
|
agg_map[col] = "mean"
|
|
|
|
if "total_inspections" in tract_df.columns:
|
|
agg_map["total_inspections"] = "mean"
|
|
if "total_violations" in tract_df.columns:
|
|
agg_map["total_violations"] = "mean"
|
|
if "major_violations" in tract_df.columns:
|
|
agg_map["major_violations"] = "sum"
|
|
|
|
tract_summary = (
|
|
tract_df.groupby("census_tract_geoid")
|
|
.agg(agg_map)
|
|
.rename(columns={self.ID_COLUMN: "wells_in_tract"})
|
|
.reset_index()
|
|
)
|
|
rename_map = {
|
|
"total_inspections": "avg_inspections",
|
|
"total_violations": "avg_violations",
|
|
"compliance_rate": "avg_compliance_rate",
|
|
}
|
|
tract_summary = tract_summary.rename(columns=rename_map)
|
|
|
|
demographic_vars = [
|
|
col
|
|
for col in [
|
|
"ej_composite_score",
|
|
"pct_minority",
|
|
"pct_hispanic",
|
|
"poverty_rate",
|
|
"median_household_income",
|
|
]
|
|
if col in tract_summary.columns
|
|
]
|
|
performance_vars = [
|
|
col
|
|
for col in [
|
|
"avg_inspections",
|
|
"avg_violations",
|
|
"major_violations",
|
|
"avg_compliance_rate",
|
|
"avg_days_between_inspections",
|
|
"reinspection_compliance_rate",
|
|
"wells_in_tract",
|
|
]
|
|
if col in tract_summary.columns
|
|
]
|
|
|
|
correlations: Dict[str, Dict[str, float]] = {dv: {} for dv in demographic_vars}
|
|
for d in demographic_vars:
|
|
for p in performance_vars:
|
|
correlations[d][p] = tract_summary[d].corr(tract_summary[p], method="spearman")
|
|
|
|
def split_high_low(column: str) -> Dict[str, Dict[str, float]]:
|
|
result: Dict[str, Dict[str, float]] = {}
|
|
if column not in tract_summary.columns:
|
|
return result
|
|
median_value = tract_summary[column].median()
|
|
high = tract_summary[tract_summary[column] > median_value]
|
|
low = tract_summary[tract_summary[column] <= median_value]
|
|
for metric in performance_vars:
|
|
result[metric] = {
|
|
"high": high[metric].mean() if not high.empty else float("nan"),
|
|
"low": low[metric].mean() if not low.empty else float("nan"),
|
|
}
|
|
return result
|
|
|
|
return {
|
|
"summary": {
|
|
"total_tracts": int(len(tract_summary)),
|
|
"total_wells": int(tract_summary["wells_in_tract"].sum()),
|
|
"avg_wells_per_tract": tract_summary["wells_in_tract"].mean(),
|
|
"avg_ej_score": tract_summary.get("ej_composite_score", pd.Series(dtype=float)).mean(),
|
|
},
|
|
"correlations": correlations,
|
|
"high_vulnerability_vs_low": split_high_low("ej_composite_score"),
|
|
"high_poverty_vs_low": split_high_low("poverty_rate"),
|
|
}
|
|
|
|
# --------------------------------------------------------------------------------------
|
|
# Public helpers
|
|
# --------------------------------------------------------------------------------------
|
|
|
|
def get_analysis(self) -> Dict[str, Any]:
|
|
return {
|
|
"inspection_analysis": self.analyze_inspection_patterns(),
|
|
"violation_analysis": self.analyze_violations(),
|
|
"regulatory_chain": self.analyze_regulatory_chain(),
|
|
"environmental_justice": self.analyze_environmental_justice(),
|
|
}
|
|
|
|
@staticmethod
|
|
def _format_stat(stat_value: Any) -> str:
|
|
if stat_value is None:
|
|
return "n/a"
|
|
if isinstance(stat_value, (int, float)):
|
|
if pd.isna(stat_value):
|
|
return "n/a"
|
|
return f"{stat_value:,.2f}"
|
|
return str(stat_value)
|
|
|
|
def print_analysis(self) -> None:
|
|
analysis = self.get_analysis()
|
|
|
|
def print_block(title: str, stats: Dict[str, Any]) -> None:
|
|
if not stats:
|
|
return
|
|
print(f"\n{title}")
|
|
for stat_key, stat_value in stats.items():
|
|
print(f" {stat_key.replace('_', ' ').title()}: {self._format_stat(stat_value)}")
|
|
|
|
print_block("INSPECTION ANALYSIS", analysis.get("inspection_analysis", {}).get("overall_statistics", {}))
|
|
print_block("VIOLATION ANALYSIS", analysis.get("violation_analysis", {}).get("overall_statistics", {}))
|
|
print_block("REGULATORY CHAIN", analysis.get("regulatory_chain", {}).get("summary", {}))
|
|
print_block("ENVIRONMENTAL JUSTICE", analysis.get("environmental_justice", {}).get("summary", {}))
|
|
|
|
violation_types = analysis.get("violation_analysis", {}).get("violation_types", {})
|
|
if violation_types:
|
|
print("\nTop Violations:")
|
|
for rule, count in violation_types.get("common_violations", {}).items():
|
|
print(f" {rule}: {self._format_stat(count)}")
|
|
|
|
def get_summary_stats(self) -> Dict[str, Any]:
|
|
stats: Dict[str, Any] = {}
|
|
wells = self.data.get("well_data", pd.DataFrame())
|
|
if not wells.empty:
|
|
stats["total_wells"] = len(wells)
|
|
stats["unique_census_tracts"] = wells["census_tract_geoid"].nunique(
|
|
dropna=True
|
|
) if "census_tract_geoid" in wells.columns else None
|
|
insp = self.data.get("inspections", pd.DataFrame())
|
|
if not insp.empty:
|
|
stats["total_inspections"] = len(insp)
|
|
viol = self.data.get("violations", pd.DataFrame())
|
|
if not viol.empty:
|
|
stats["total_violations"] = len(viol)
|
|
return stats
|
|
|
|
def export_analysis(self, path: Path | str) -> None:
|
|
output_path = Path(path)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
analysis = self.get_analysis()
|
|
output_path.write_text(json.dumps(analysis, indent=2, default=str), encoding="utf-8")
|
|
logger.info("Analysis exported to %s", output_path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
analyzer = WellAnalyzer()
|
|
print("Summary Stats:")
|
|
for key, value in analyzer.get_summary_stats().items():
|
|
print(f" {key.replace('_', ' ').title()}: {value}")
|
|
analyzer.print_analysis()
|
|
analyzer.export_analysis(Path("analysis_output.json"))
|
|
except WellAnalyzerError as exc:
|
|
logger.error("Well Analyzer failed: %s", exc, exc_info=True)
|