updated analaysis with offshore considerations
This commit is contained in:
781
analysis/archive/well_analyzer.py
Normal file
781
analysis/archive/well_analyzer.py
Normal file
@@ -0,0 +1,781 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import warnings
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
import pandas as pd
|
||||
from dotenv import load_dotenv
|
||||
from sqlalchemy import Engine, create_engine, text
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
# Configure logging early so that downstream modules inherit the settings.
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Pandas throws a lot of noisy warnings when casting timestamps out of Postgres.
|
||||
warnings.filterwarnings("ignore", category=UserWarning)
|
||||
|
||||
|
||||
class WellAnalyzerError(Exception):
|
||||
"""Base exception class for all analyzer failures."""
|
||||
|
||||
|
||||
class ConfigError(WellAnalyzerError):
|
||||
"""Raised when we cannot build a working configuration."""
|
||||
|
||||
|
||||
class DataLoadError(WellAnalyzerError):
|
||||
"""Raised when data cannot be loaded from the warehouse."""
|
||||
|
||||
|
||||
class AnalysisError(WellAnalyzerError):
|
||||
"""Raised when a downstream analytic task fails."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
"""Runtime configuration for the analyzer."""
|
||||
|
||||
engine: Engine
|
||||
chunk_size: int = 10_000
|
||||
cache_dir: Path = Path("./cache")
|
||||
well_source: str = ""
|
||||
inspections_source: str = ""
|
||||
violations_source: str = ""
|
||||
|
||||
|
||||
def _load_engine_from_env() -> Engine:
|
||||
"""Build a SQLAlchemy engine using PG* environment variables."""
|
||||
|
||||
load_dotenv(override=False)
|
||||
|
||||
host = os.getenv("PGHOST", "localhost")
|
||||
port = os.getenv("PGPORT", "5432")
|
||||
user = os.getenv("PGUSER", "postgres")
|
||||
password = quote_plus(os.getenv("PGPASSWORD", ""))
|
||||
database = os.getenv("PGDATABASE", "texas_data")
|
||||
|
||||
url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
|
||||
logger.info("Connecting to Postgres", extra={"host": host, "database": database})
|
||||
try:
|
||||
return create_engine(url)
|
||||
except SQLAlchemyError as exc:
|
||||
raise ConfigError(f"Failed to create engine for {url}: {exc}") from exc
|
||||
|
||||
|
||||
class WellAnalyzer:
|
||||
"""
|
||||
Analyze wells, inspections, violations, and environmental-justice indicators held in the
|
||||
rebuilt PostGIS warehouse. The class auto-detects the rebuilt tables that now exist
|
||||
in the texas-rebuild-postgis project so it works against both fresh rebuilds and future
|
||||
refreshes without hand-editing SQL strings.
|
||||
"""
|
||||
|
||||
ID_COLUMN = "api_norm"
|
||||
WELL_SOURCE_CANDIDATES = [
|
||||
"well_shape_tract",
|
||||
]
|
||||
|
||||
WELL_COLUMN_MAP: Dict[str, List[str]] = {
|
||||
"census_tract_geoid": ["census_tract_geoid", "geoid", "tract_geoid"],
|
||||
"tract_name": ["tract_name", "name"],
|
||||
"ruca_category": ["ruca_category"],
|
||||
"ruca_code_2020": ["ruca_code_2020"],
|
||||
"ruca_primary_description": ["ruca_primary_description"],
|
||||
"ruca_secondary_description": ["ruca_secondary_description"],
|
||||
"ej_composite_score": ["ej_composite_score"],
|
||||
"pct_minority": ["pct_minority"],
|
||||
"pct_hispanic": ["pct_hispanic"],
|
||||
"poverty_rate": ["poverty_rate"],
|
||||
"unemployment_rate": ["unemployment_rate"],
|
||||
"less_than_hs_pct": ["less_than_hs_pct"],
|
||||
"linguistic_isolation_rate": ["linguistic_isolation_rate"],
|
||||
"renter_cost_burden_rate": ["renter_cost_burden_rate"],
|
||||
"disability_rate": ["disability_rate"],
|
||||
"pct_under5": ["pct_under5"],
|
||||
"pct_65plus": ["pct_65plus"],
|
||||
"median_household_income": ["median_household_income"],
|
||||
"latitude": ["latitude", "lat"],
|
||||
"longitude": ["longitude", "lon", "lng"],
|
||||
"basin_label": ["basin_label", "basin_name"],
|
||||
"play_label": ["play_label", "play_name"],
|
||||
"texmex_name": ["texmex_name"],
|
||||
"distance_to_texmex_km": ["distance_to_texmex_km"],
|
||||
"within_25km_texmex": ["within_25km_texmex"],
|
||||
"within_50km_texmex": ["within_50km_texmex"],
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
engine: Optional[Engine] = None,
|
||||
*,
|
||||
chunk_size: int = 10_000,
|
||||
cache_dir: Optional[Path] = None,
|
||||
well_source: Optional[str] = None,
|
||||
) -> None:
|
||||
if engine is None:
|
||||
engine = _load_engine_from_env()
|
||||
|
||||
self.config = Config(
|
||||
engine=engine,
|
||||
chunk_size=chunk_size,
|
||||
cache_dir=(cache_dir or Path("./cache")),
|
||||
)
|
||||
|
||||
self.config.well_source = well_source or self._detect_table(self.WELL_SOURCE_CANDIDATES)
|
||||
if not self.config.well_source:
|
||||
raise ConfigError(
|
||||
"Could not find a well table/view. "
|
||||
"Expected one of well_enriched_all_plus, well_enriched_all, well_with_demographics_table, well_shape_tract."
|
||||
)
|
||||
|
||||
self.config.inspections_source = self._detect_table(["inspections"])
|
||||
self.config.violations_source = self._detect_table(["violations"])
|
||||
if not self.config.inspections_source or not self.config.violations_source:
|
||||
raise ConfigError("Both inspections and violations tables must exist in the database.")
|
||||
|
||||
self.data: Dict[str, pd.DataFrame] = {}
|
||||
self._initialize_data()
|
||||
|
||||
# --------------------------------------------------------------------------------------
|
||||
# Helper methods for metadata detection and SQL building
|
||||
# --------------------------------------------------------------------------------------
|
||||
|
||||
def _detect_table(self, candidates: List[str]) -> Optional[str]:
|
||||
for candidate in candidates:
|
||||
found = self._table_exists(candidate)
|
||||
if found:
|
||||
return found
|
||||
return None
|
||||
|
||||
def _table_exists(self, table: str) -> Optional[str]:
|
||||
names_to_try = []
|
||||
if "." in table:
|
||||
names_to_try.append(table)
|
||||
else:
|
||||
names_to_try.append(f"public.{table}")
|
||||
names_to_try.append(table)
|
||||
|
||||
query = text("SELECT to_regclass(:name) IS NOT NULL AS exists")
|
||||
with self.config.engine.begin() as conn:
|
||||
for name in names_to_try:
|
||||
exists = conn.execute(query, {"name": name}).scalar()
|
||||
if exists:
|
||||
return name
|
||||
return None
|
||||
|
||||
def _split_table_name(self, qualified: str) -> Dict[str, Optional[str]]:
|
||||
if "." in qualified:
|
||||
schema, table = qualified.split(".", 1)
|
||||
return {"schema": schema, "table": table}
|
||||
return {"schema": None, "table": qualified}
|
||||
|
||||
def _get_columns(self, table: str) -> Dict[str, str]:
|
||||
pieces = self._split_table_name(table)
|
||||
sql = [
|
||||
"SELECT column_name",
|
||||
"FROM information_schema.columns",
|
||||
"WHERE table_name = :table",
|
||||
]
|
||||
params = {"table": pieces["table"]}
|
||||
if pieces["schema"]:
|
||||
sql.append("AND table_schema = :schema")
|
||||
params["schema"] = pieces["schema"]
|
||||
sql.append("ORDER BY ordinal_position")
|
||||
with self.config.engine.begin() as conn:
|
||||
rows = conn.execute(text(" ".join(sql)), params).fetchall()
|
||||
return {row[0].lower(): row[0] for row in rows}
|
||||
|
||||
def _pick_column(self, columns: Dict[str, str], names: List[str]) -> Optional[str]:
|
||||
for name in names:
|
||||
if name.lower() in columns:
|
||||
return columns[name.lower()]
|
||||
return None
|
||||
|
||||
def _execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
|
||||
try:
|
||||
df_chunks: List[pd.DataFrame] = []
|
||||
for chunk in pd.read_sql(
|
||||
text(query),
|
||||
self.config.engine,
|
||||
params=params,
|
||||
chunksize=self.config.chunk_size,
|
||||
):
|
||||
df_chunks.append(chunk)
|
||||
return pd.concat(df_chunks, ignore_index=True) if df_chunks else pd.DataFrame()
|
||||
except SQLAlchemyError as exc:
|
||||
logger.error("Query failed: %s", query, exc_info=True)
|
||||
raise DataLoadError(f"Failed executing query: {exc}") from exc
|
||||
|
||||
# --------------------------------------------------------------------------------------
|
||||
# Data loading
|
||||
# --------------------------------------------------------------------------------------
|
||||
|
||||
def _initialize_data(self) -> None:
|
||||
self.data["well_data"] = self._load_wells()
|
||||
self.data["inspections"] = self._load_inspections()
|
||||
self.data["violations"] = self._load_violations()
|
||||
if not self.data["well_data"].empty and (
|
||||
not self.data["inspections"].empty or not self.data["violations"].empty
|
||||
):
|
||||
self._create_performance_metrics()
|
||||
|
||||
def _load_wells(self) -> pd.DataFrame:
|
||||
table = self.config.well_source
|
||||
columns = self._get_columns(table)
|
||||
alias = "w"
|
||||
api_norm_col = self._pick_column(columns, ["api_norm"])
|
||||
if not api_norm_col:
|
||||
raise DataLoadError(f"{table} does not expose api_norm")
|
||||
select_parts = [f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}']
|
||||
|
||||
for target, candidates in self.WELL_COLUMN_MAP.items():
|
||||
column = self._pick_column(columns, candidates)
|
||||
if column:
|
||||
select_parts.append(f'{alias}."{column}" AS {target}')
|
||||
|
||||
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias}'
|
||||
df = self._execute_query(query)
|
||||
|
||||
df[self.ID_COLUMN] = df[self.ID_COLUMN].astype(str).str.strip()
|
||||
df = df[df[self.ID_COLUMN].notna()]
|
||||
df = df.drop_duplicates(subset=[self.ID_COLUMN]).reset_index(drop=True)
|
||||
logger.info("Loaded %s wells from %s", len(df), table)
|
||||
return df
|
||||
|
||||
def _load_inspections(self) -> pd.DataFrame:
|
||||
table = self.config.inspections_source
|
||||
columns = self._get_columns(table)
|
||||
alias = "i"
|
||||
select_parts = []
|
||||
|
||||
base_candidates = [
|
||||
"id",
|
||||
"district",
|
||||
"county",
|
||||
"inspection_date",
|
||||
"inspection_type",
|
||||
"operator_name",
|
||||
"field_name",
|
||||
"compliance",
|
||||
"file_date",
|
||||
"created_at",
|
||||
]
|
||||
|
||||
for column in base_candidates:
|
||||
picked = self._pick_column(columns, [column])
|
||||
if picked:
|
||||
select_parts.append(f'{alias}."{picked}" AS {column}')
|
||||
|
||||
api_norm_col = self._pick_column(columns, ["api_norm"])
|
||||
if not api_norm_col:
|
||||
raise DataLoadError(f"{table} does not expose api_norm")
|
||||
select_parts.append(f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}')
|
||||
where_clause = f'WHERE {alias}."{api_norm_col}" IS NOT NULL'
|
||||
|
||||
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias} {where_clause}'
|
||||
df = self._execute_query(query)
|
||||
|
||||
for col in ["inspection_date", "file_date", "created_at"]:
|
||||
if col in df.columns:
|
||||
df[col] = pd.to_datetime(df[col], errors="coerce")
|
||||
|
||||
df = df[df[self.ID_COLUMN].notna()].copy()
|
||||
df = df[df[self.ID_COLUMN].notna()].copy()
|
||||
if "inspection_date" in df.columns:
|
||||
df = df.sort_values([self.ID_COLUMN, "inspection_date"])
|
||||
df["days_since_last_inspection"] = (
|
||||
df.groupby(self.ID_COLUMN)["inspection_date"].diff().dt.days
|
||||
)
|
||||
|
||||
logger.info("Loaded %s inspections from %s", len(df), table)
|
||||
return df.reset_index(drop=True)
|
||||
|
||||
def _load_violations(self) -> pd.DataFrame:
|
||||
table = self.config.violations_source
|
||||
columns = self._get_columns(table)
|
||||
alias = "v"
|
||||
select_parts = []
|
||||
row_id_candidates = ["id", "violation_id", "violationid", "objectid", "row_id"]
|
||||
|
||||
base_candidates = [
|
||||
"operator_name",
|
||||
"p5_operator_no",
|
||||
"district",
|
||||
"oil_lease_gas_well_id",
|
||||
"lease_fac_name",
|
||||
"well_no",
|
||||
"drilling_permit_no",
|
||||
"field_name",
|
||||
"violated_rule",
|
||||
"violated_rule_desc",
|
||||
"major_viol_ind",
|
||||
"compliant_on_reinsp",
|
||||
"last_enf_action",
|
||||
"last_enf_action_date",
|
||||
"violation_disc_date",
|
||||
"file_date",
|
||||
"created_at",
|
||||
]
|
||||
|
||||
for row_id in row_id_candidates:
|
||||
picked = self._pick_column(columns, [row_id])
|
||||
if picked:
|
||||
select_parts.append(f'{alias}."{picked}" AS {row_id}')
|
||||
break
|
||||
|
||||
for column in base_candidates:
|
||||
picked = self._pick_column(columns, [column])
|
||||
if picked:
|
||||
select_parts.append(f'{alias}."{picked}" AS {column}')
|
||||
|
||||
api_norm_col = self._pick_column(columns, ["api_norm"])
|
||||
if not api_norm_col:
|
||||
raise DataLoadError(f"{table} does not expose api_norm")
|
||||
select_parts.append(f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}')
|
||||
where_clause = f'WHERE {alias}."{api_norm_col}" IS NOT NULL'
|
||||
|
||||
query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias} {where_clause}'
|
||||
df = self._execute_query(query)
|
||||
|
||||
for col in ["violation_disc_date", "last_enf_action_date", "file_date", "created_at"]:
|
||||
if col in df.columns:
|
||||
df[col] = pd.to_datetime(df[col], errors="coerce")
|
||||
|
||||
df = df[df[self.ID_COLUMN].notna()].reset_index(drop=True)
|
||||
df = df[df[self.ID_COLUMN].notna()].reset_index(drop=True)
|
||||
|
||||
if not df.empty:
|
||||
row_id_col = next(
|
||||
(c for c in ["id", "violation_id", "violationid", "objectid", "row_id"] if c in df.columns),
|
||||
None,
|
||||
)
|
||||
if row_id_col is None:
|
||||
df["violation_row_id"] = range(1, len(df) + 1)
|
||||
else:
|
||||
df = df.rename(columns={row_id_col: "violation_row_id"})
|
||||
df["total_violations"] = df.groupby(self.ID_COLUMN)["violation_row_id"].transform("count")
|
||||
df["violation_number"] = df.groupby(self.ID_COLUMN).cumcount() + 1
|
||||
|
||||
logger.info("Loaded %s violations from %s", len(df), table)
|
||||
return df
|
||||
|
||||
def _create_performance_metrics(self) -> None:
|
||||
insp = self.data.get("inspections", pd.DataFrame()).copy()
|
||||
viol = self.data.get("violations", pd.DataFrame()).copy()
|
||||
if insp.empty and viol.empty:
|
||||
return
|
||||
|
||||
if not insp.empty:
|
||||
count_col = "id" if "id" in insp.columns else "inspection_date"
|
||||
insp_metrics = insp.groupby(self.ID_COLUMN).agg(
|
||||
total_inspections=(count_col, "count")
|
||||
)
|
||||
if "compliance" in insp.columns:
|
||||
insp_metrics["compliance_rate"] = (
|
||||
insp.groupby(self.ID_COLUMN)["compliance"]
|
||||
.apply(lambda x: (x == "Yes").mean() * 100)
|
||||
)
|
||||
if "days_since_last_inspection" in insp.columns:
|
||||
insp_metrics["avg_days_between_inspections"] = (
|
||||
insp.groupby(self.ID_COLUMN)["days_since_last_inspection"].mean()
|
||||
)
|
||||
else:
|
||||
insp_metrics = pd.DataFrame()
|
||||
|
||||
if not viol.empty:
|
||||
count_col = (
|
||||
"violation_row_id"
|
||||
if "violation_row_id" in viol.columns
|
||||
else ("id" if "id" in viol.columns else self.ID_COLUMN)
|
||||
)
|
||||
viol_metrics = viol.groupby(self.ID_COLUMN).agg(
|
||||
total_violations=(count_col, "count"),
|
||||
)
|
||||
if "major_viol_ind" in viol.columns:
|
||||
viol_metrics["major_violations"] = (
|
||||
viol.groupby(self.ID_COLUMN)["major_viol_ind"]
|
||||
.apply(lambda x: (x == "Y").sum())
|
||||
)
|
||||
if "compliant_on_reinsp" in viol.columns:
|
||||
viol_metrics["reinspection_compliance_rate"] = (
|
||||
viol.groupby(self.ID_COLUMN)["compliant_on_reinsp"]
|
||||
.apply(lambda x: (x == "Y").mean() * 100)
|
||||
)
|
||||
else:
|
||||
viol_metrics = pd.DataFrame()
|
||||
|
||||
metrics = insp_metrics.join(viol_metrics, how="outer").fillna(0)
|
||||
metrics = metrics.reset_index()
|
||||
self.data["performance_metrics"] = metrics
|
||||
|
||||
# --------------------------------------------------------------------------------------
|
||||
# Analytic helpers
|
||||
# --------------------------------------------------------------------------------------
|
||||
|
||||
def analyze_inspection_patterns(self) -> Dict[str, Any]:
|
||||
insp_df = self.data.get("inspections", pd.DataFrame())
|
||||
if insp_df.empty:
|
||||
return {}
|
||||
|
||||
result: Dict[str, Any] = {
|
||||
"overall_statistics": {
|
||||
"total_inspections": int(len(insp_df)),
|
||||
"unique_wells_inspected": int(insp_df[self.ID_COLUMN].nunique()),
|
||||
}
|
||||
}
|
||||
|
||||
if "compliance" in insp_df.columns:
|
||||
result["overall_statistics"]["overall_compliance_rate"] = (
|
||||
(insp_df["compliance"] == "Yes").mean() * 100
|
||||
)
|
||||
if "days_since_last_inspection" in insp_df.columns:
|
||||
result["overall_statistics"]["avg_days_between_inspections"] = insp_df[
|
||||
"days_since_last_inspection"
|
||||
].mean()
|
||||
result["overall_statistics"]["median_days_between_inspections"] = insp_df[
|
||||
"days_since_last_inspection"
|
||||
].median()
|
||||
|
||||
if "inspection_date" in insp_df.columns:
|
||||
insp_df["inspection_date"] = pd.to_datetime(insp_df["inspection_date"], errors="coerce")
|
||||
insp_df["year"] = insp_df["inspection_date"].dt.year
|
||||
result["temporal_patterns"] = {
|
||||
"inspections_by_year": insp_df.groupby("year").size().dropna().to_dict()
|
||||
}
|
||||
if "compliance" in insp_df.columns:
|
||||
result["temporal_patterns"]["compliance_by_year"] = (
|
||||
insp_df.groupby("year")["compliance"]
|
||||
.apply(lambda x: (x == "Yes").mean() * 100)
|
||||
.dropna()
|
||||
.to_dict()
|
||||
)
|
||||
|
||||
if "district" in insp_df.columns:
|
||||
district_counts = insp_df.groupby("district").size().to_dict()
|
||||
district_compliance = {}
|
||||
if "compliance" in insp_df.columns:
|
||||
district_compliance = (
|
||||
insp_df.groupby("district")["compliance"]
|
||||
.apply(lambda x: (x == "Yes").mean() * 100)
|
||||
.to_dict()
|
||||
)
|
||||
result["district_performance"] = {
|
||||
"inspections_by_district": district_counts,
|
||||
"compliance_by_district": district_compliance,
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
def analyze_violations(self) -> Dict[str, Any]:
|
||||
viol_df = self.data.get("violations", pd.DataFrame())
|
||||
if viol_df.empty:
|
||||
return {}
|
||||
|
||||
result: Dict[str, Any] = {
|
||||
"overall_statistics": {
|
||||
"total_violations": int(len(viol_df)),
|
||||
"unique_wells_with_violations": int(viol_df[self.ID_COLUMN].nunique()),
|
||||
},
|
||||
"violation_types": {},
|
||||
"enforcement_effectiveness": {},
|
||||
}
|
||||
|
||||
if "major_viol_ind" in viol_df.columns:
|
||||
result["overall_statistics"]["major_violations"] = int(
|
||||
(viol_df["major_viol_ind"] == "Y").sum()
|
||||
)
|
||||
if "compliant_on_reinsp" in viol_df.columns:
|
||||
result["overall_statistics"]["compliance_on_reinspection_rate"] = (
|
||||
(viol_df["compliant_on_reinsp"] == "Y").mean() * 100
|
||||
)
|
||||
|
||||
if "violated_rule" in viol_df.columns:
|
||||
result["violation_types"]["common_violations"] = (
|
||||
viol_df["violated_rule"].value_counts().head(10).to_dict()
|
||||
)
|
||||
if "major_viol_ind" in viol_df.columns:
|
||||
result["violation_types"]["major_violation_types"] = (
|
||||
viol_df[viol_df["major_viol_ind"] == "Y"]["violated_rule"]
|
||||
.value_counts()
|
||||
.head(5)
|
||||
.to_dict()
|
||||
)
|
||||
|
||||
if {"violated_rule", "compliant_on_reinsp"} <= set(viol_df.columns):
|
||||
result["enforcement_effectiveness"]["resolution_rate_by_type"] = (
|
||||
viol_df.groupby("violated_rule")["compliant_on_reinsp"]
|
||||
.apply(lambda x: (x == "Y").mean() * 100)
|
||||
.to_dict()
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def analyze_regulatory_chain(self) -> Dict[str, Any]:
|
||||
insp_df = self.data.get("inspections", pd.DataFrame()).copy()
|
||||
viol_df = self.data.get("violations", pd.DataFrame()).copy()
|
||||
if insp_df.empty or viol_df.empty:
|
||||
return {}
|
||||
|
||||
if "inspection_date" not in insp_df.columns or "violation_disc_date" not in viol_df.columns:
|
||||
return {}
|
||||
|
||||
insp_df["inspection_date"] = pd.to_datetime(insp_df["inspection_date"], errors="coerce")
|
||||
viol_df["violation_disc_date"] = pd.to_datetime(viol_df["violation_disc_date"], errors="coerce")
|
||||
viol_df["last_enf_action_date"] = pd.to_datetime(
|
||||
viol_df.get("last_enf_action_date"), errors="coerce"
|
||||
)
|
||||
|
||||
insp_df = insp_df.dropna(subset=[self.ID_COLUMN, "inspection_date"])
|
||||
viol_df = viol_df.dropna(subset=[self.ID_COLUMN, "violation_disc_date"])
|
||||
if insp_df.empty or viol_df.empty:
|
||||
return {}
|
||||
|
||||
insp_sorted = (
|
||||
insp_df.sort_values(["inspection_date", self.ID_COLUMN])
|
||||
.reset_index(drop=True)
|
||||
)
|
||||
viol_sorted = (
|
||||
viol_df.sort_values(["violation_disc_date", self.ID_COLUMN])
|
||||
.reset_index(drop=True)
|
||||
)
|
||||
matched_df = pd.merge_asof(
|
||||
viol_sorted,
|
||||
insp_sorted,
|
||||
left_on="violation_disc_date",
|
||||
right_on="inspection_date",
|
||||
by=self.ID_COLUMN,
|
||||
direction="backward",
|
||||
suffixes=("_viol", "_insp"),
|
||||
).dropna(subset=["inspection_date"])
|
||||
|
||||
if matched_df.empty:
|
||||
return {}
|
||||
total_inspections = len(insp_df)
|
||||
inspection_id_col = "id_insp" if "id_insp" in matched_df.columns else "inspection_date"
|
||||
inspections_with_violations = matched_df[inspection_id_col].nunique()
|
||||
violation_rate = (inspections_with_violations / total_inspections) * 100 if total_inspections else 0
|
||||
|
||||
time_spans = matched_df.dropna(
|
||||
subset=["inspection_date", "violation_disc_date", "last_enf_action_date"]
|
||||
).copy()
|
||||
time_spans["insp_to_viol"] = (
|
||||
time_spans["violation_disc_date"] - time_spans["inspection_date"]
|
||||
).dt.days
|
||||
time_spans["viol_to_enforce"] = (
|
||||
time_spans["last_enf_action_date"] - time_spans["violation_disc_date"]
|
||||
).dt.days
|
||||
time_spans["total_span"] = time_spans["insp_to_viol"] + time_spans["viol_to_enforce"]
|
||||
|
||||
enforcement_patterns = {
|
||||
"action_types": viol_df["last_enf_action"].value_counts().to_dict()
|
||||
if "last_enf_action" in viol_df.columns
|
||||
else {},
|
||||
"enforcement_rate": (
|
||||
matched_df["last_enf_action"].notna().sum() / len(matched_df) * 100
|
||||
),
|
||||
}
|
||||
if not time_spans.empty:
|
||||
enforcement_patterns["avg_days_to_enforcement"] = time_spans["viol_to_enforce"].mean()
|
||||
|
||||
return {
|
||||
"summary": {
|
||||
"total_inspections": total_inspections,
|
||||
"violation_rate": violation_rate,
|
||||
"unique_wells_inspected": int(insp_df[self.ID_COLUMN].nunique()),
|
||||
},
|
||||
"conversion_funnel": {
|
||||
"inspections_with_violations": inspections_with_violations,
|
||||
"violations_with_enforcement": int(matched_df["last_enf_action"].notna().sum()),
|
||||
},
|
||||
"enforcement_patterns": enforcement_patterns,
|
||||
"time_spans": time_spans.describe().to_dict() if not time_spans.empty else {},
|
||||
}
|
||||
|
||||
def analyze_environmental_justice(self) -> Dict[str, Any]:
|
||||
well_df = self.data.get("well_data", pd.DataFrame()).copy()
|
||||
if well_df.empty or "census_tract_geoid" not in well_df.columns:
|
||||
return {}
|
||||
|
||||
metrics = self.data.get("performance_metrics")
|
||||
if metrics is not None and not metrics.empty:
|
||||
well_df = well_df.merge(metrics, on=self.ID_COLUMN, how="left")
|
||||
|
||||
tract_df = well_df.dropna(subset=["census_tract_geoid"]).copy()
|
||||
if tract_df.empty:
|
||||
return {}
|
||||
|
||||
agg_map: Dict[str, str] = {self.ID_COLUMN: "count"}
|
||||
mean_cols = [
|
||||
"ej_composite_score",
|
||||
"pct_minority",
|
||||
"pct_hispanic",
|
||||
"poverty_rate",
|
||||
"median_household_income",
|
||||
"avg_days_between_inspections",
|
||||
"reinspection_compliance_rate",
|
||||
"compliance_rate",
|
||||
]
|
||||
for col in mean_cols:
|
||||
if col in tract_df.columns:
|
||||
agg_map[col] = "mean"
|
||||
|
||||
if "total_inspections" in tract_df.columns:
|
||||
agg_map["total_inspections"] = "mean"
|
||||
if "total_violations" in tract_df.columns:
|
||||
agg_map["total_violations"] = "mean"
|
||||
if "major_violations" in tract_df.columns:
|
||||
agg_map["major_violations"] = "sum"
|
||||
|
||||
tract_summary = (
|
||||
tract_df.groupby("census_tract_geoid")
|
||||
.agg(agg_map)
|
||||
.rename(columns={self.ID_COLUMN: "wells_in_tract"})
|
||||
.reset_index()
|
||||
)
|
||||
rename_map = {
|
||||
"total_inspections": "avg_inspections",
|
||||
"total_violations": "avg_violations",
|
||||
"compliance_rate": "avg_compliance_rate",
|
||||
}
|
||||
tract_summary = tract_summary.rename(columns=rename_map)
|
||||
|
||||
demographic_vars = [
|
||||
col
|
||||
for col in [
|
||||
"ej_composite_score",
|
||||
"pct_minority",
|
||||
"pct_hispanic",
|
||||
"poverty_rate",
|
||||
"median_household_income",
|
||||
]
|
||||
if col in tract_summary.columns
|
||||
]
|
||||
performance_vars = [
|
||||
col
|
||||
for col in [
|
||||
"avg_inspections",
|
||||
"avg_violations",
|
||||
"major_violations",
|
||||
"avg_compliance_rate",
|
||||
"avg_days_between_inspections",
|
||||
"reinspection_compliance_rate",
|
||||
"wells_in_tract",
|
||||
]
|
||||
if col in tract_summary.columns
|
||||
]
|
||||
|
||||
correlations: Dict[str, Dict[str, float]] = {dv: {} for dv in demographic_vars}
|
||||
for d in demographic_vars:
|
||||
for p in performance_vars:
|
||||
correlations[d][p] = tract_summary[d].corr(tract_summary[p], method="spearman")
|
||||
|
||||
def split_high_low(column: str) -> Dict[str, Dict[str, float]]:
|
||||
result: Dict[str, Dict[str, float]] = {}
|
||||
if column not in tract_summary.columns:
|
||||
return result
|
||||
median_value = tract_summary[column].median()
|
||||
high = tract_summary[tract_summary[column] > median_value]
|
||||
low = tract_summary[tract_summary[column] <= median_value]
|
||||
for metric in performance_vars:
|
||||
result[metric] = {
|
||||
"high": high[metric].mean() if not high.empty else float("nan"),
|
||||
"low": low[metric].mean() if not low.empty else float("nan"),
|
||||
}
|
||||
return result
|
||||
|
||||
return {
|
||||
"summary": {
|
||||
"total_tracts": int(len(tract_summary)),
|
||||
"total_wells": int(tract_summary["wells_in_tract"].sum()),
|
||||
"avg_wells_per_tract": tract_summary["wells_in_tract"].mean(),
|
||||
"avg_ej_score": tract_summary.get("ej_composite_score", pd.Series(dtype=float)).mean(),
|
||||
},
|
||||
"correlations": correlations,
|
||||
"high_vulnerability_vs_low": split_high_low("ej_composite_score"),
|
||||
"high_poverty_vs_low": split_high_low("poverty_rate"),
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------------------------
|
||||
# Public helpers
|
||||
# --------------------------------------------------------------------------------------
|
||||
|
||||
def get_analysis(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"inspection_analysis": self.analyze_inspection_patterns(),
|
||||
"violation_analysis": self.analyze_violations(),
|
||||
"regulatory_chain": self.analyze_regulatory_chain(),
|
||||
"environmental_justice": self.analyze_environmental_justice(),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _format_stat(stat_value: Any) -> str:
|
||||
if stat_value is None:
|
||||
return "n/a"
|
||||
if isinstance(stat_value, (int, float)):
|
||||
if pd.isna(stat_value):
|
||||
return "n/a"
|
||||
return f"{stat_value:,.2f}"
|
||||
return str(stat_value)
|
||||
|
||||
def print_analysis(self) -> None:
|
||||
analysis = self.get_analysis()
|
||||
|
||||
def print_block(title: str, stats: Dict[str, Any]) -> None:
|
||||
if not stats:
|
||||
return
|
||||
print(f"\n{title}")
|
||||
for stat_key, stat_value in stats.items():
|
||||
print(f" {stat_key.replace('_', ' ').title()}: {self._format_stat(stat_value)}")
|
||||
|
||||
print_block("INSPECTION ANALYSIS", analysis.get("inspection_analysis", {}).get("overall_statistics", {}))
|
||||
print_block("VIOLATION ANALYSIS", analysis.get("violation_analysis", {}).get("overall_statistics", {}))
|
||||
print_block("REGULATORY CHAIN", analysis.get("regulatory_chain", {}).get("summary", {}))
|
||||
print_block("ENVIRONMENTAL JUSTICE", analysis.get("environmental_justice", {}).get("summary", {}))
|
||||
|
||||
violation_types = analysis.get("violation_analysis", {}).get("violation_types", {})
|
||||
if violation_types:
|
||||
print("\nTop Violations:")
|
||||
for rule, count in violation_types.get("common_violations", {}).items():
|
||||
print(f" {rule}: {self._format_stat(count)}")
|
||||
|
||||
def get_summary_stats(self) -> Dict[str, Any]:
|
||||
stats: Dict[str, Any] = {}
|
||||
wells = self.data.get("well_data", pd.DataFrame())
|
||||
if not wells.empty:
|
||||
stats["total_wells"] = len(wells)
|
||||
stats["unique_census_tracts"] = wells["census_tract_geoid"].nunique(
|
||||
dropna=True
|
||||
) if "census_tract_geoid" in wells.columns else None
|
||||
insp = self.data.get("inspections", pd.DataFrame())
|
||||
if not insp.empty:
|
||||
stats["total_inspections"] = len(insp)
|
||||
viol = self.data.get("violations", pd.DataFrame())
|
||||
if not viol.empty:
|
||||
stats["total_violations"] = len(viol)
|
||||
return stats
|
||||
|
||||
def export_analysis(self, path: Path | str) -> None:
|
||||
output_path = Path(path)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
analysis = self.get_analysis()
|
||||
output_path.write_text(json.dumps(analysis, indent=2, default=str), encoding="utf-8")
|
||||
logger.info("Analysis exported to %s", output_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
analyzer = WellAnalyzer()
|
||||
print("Summary Stats:")
|
||||
for key, value in analyzer.get_summary_stats().items():
|
||||
print(f" {key.replace('_', ' ').title()}: {value}")
|
||||
analyzer.print_analysis()
|
||||
analyzer.export_analysis(Path("analysis_output.json"))
|
||||
except WellAnalyzerError as exc:
|
||||
logger.error("Well Analyzer failed: %s", exc, exc_info=True)
|
||||
Reference in New Issue
Block a user