from __future__ import annotations import json import logging import os import warnings from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional from urllib.parse import quote_plus import pandas as pd from dotenv import load_dotenv from sqlalchemy import Engine, create_engine, text from sqlalchemy.exc import SQLAlchemyError # Configure logging early so that downstream modules inherit the settings. logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Pandas throws a lot of noisy warnings when casting timestamps out of Postgres. warnings.filterwarnings("ignore", category=UserWarning) class WellAnalyzerError(Exception): """Base exception class for all analyzer failures.""" class ConfigError(WellAnalyzerError): """Raised when we cannot build a working configuration.""" class DataLoadError(WellAnalyzerError): """Raised when data cannot be loaded from the warehouse.""" class AnalysisError(WellAnalyzerError): """Raised when a downstream analytic task fails.""" @dataclass class Config: """Runtime configuration for the analyzer.""" engine: Engine chunk_size: int = 10_000 cache_dir: Path = Path("./cache") well_source: str = "" inspections_source: str = "" violations_source: str = "" def _load_engine_from_env() -> Engine: """Build a SQLAlchemy engine using PG* environment variables.""" load_dotenv(override=False) host = os.getenv("PGHOST", "localhost") port = os.getenv("PGPORT", "5432") user = os.getenv("PGUSER", "postgres") password = quote_plus(os.getenv("PGPASSWORD", "")) database = os.getenv("PGDATABASE", "texas_data") url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" logger.info("Connecting to Postgres", extra={"host": host, "database": database}) try: return create_engine(url) except SQLAlchemyError as exc: raise ConfigError(f"Failed to create engine for {url}: {exc}") from exc class WellAnalyzer: """ Analyze wells, inspections, violations, and environmental-justice indicators held in the rebuilt PostGIS warehouse. The class auto-detects the rebuilt tables that now exist in the texas-rebuild-postgis project so it works against both fresh rebuilds and future refreshes without hand-editing SQL strings. """ ID_COLUMN = "api_norm" WELL_SOURCE_CANDIDATES = [ "well_shape_tract", ] WELL_COLUMN_MAP: Dict[str, List[str]] = { "census_tract_geoid": ["census_tract_geoid", "geoid", "tract_geoid"], "tract_name": ["tract_name", "name"], "ruca_category": ["ruca_category"], "ruca_code_2020": ["ruca_code_2020"], "ruca_primary_description": ["ruca_primary_description"], "ruca_secondary_description": ["ruca_secondary_description"], "ej_composite_score": ["ej_composite_score"], "pct_minority": ["pct_minority"], "pct_hispanic": ["pct_hispanic"], "poverty_rate": ["poverty_rate"], "unemployment_rate": ["unemployment_rate"], "less_than_hs_pct": ["less_than_hs_pct"], "linguistic_isolation_rate": ["linguistic_isolation_rate"], "renter_cost_burden_rate": ["renter_cost_burden_rate"], "disability_rate": ["disability_rate"], "pct_under5": ["pct_under5"], "pct_65plus": ["pct_65plus"], "median_household_income": ["median_household_income"], "latitude": ["latitude", "lat"], "longitude": ["longitude", "lon", "lng"], "basin_label": ["basin_label", "basin_name"], "play_label": ["play_label", "play_name"], "texmex_name": ["texmex_name"], "distance_to_texmex_km": ["distance_to_texmex_km"], "within_25km_texmex": ["within_25km_texmex"], "within_50km_texmex": ["within_50km_texmex"], } def __init__( self, engine: Optional[Engine] = None, *, chunk_size: int = 10_000, cache_dir: Optional[Path] = None, well_source: Optional[str] = None, ) -> None: if engine is None: engine = _load_engine_from_env() self.config = Config( engine=engine, chunk_size=chunk_size, cache_dir=(cache_dir or Path("./cache")), ) self.config.well_source = well_source or self._detect_table(self.WELL_SOURCE_CANDIDATES) if not self.config.well_source: raise ConfigError( "Could not find a well table/view. " "Expected one of well_enriched_all_plus, well_enriched_all, well_with_demographics_table, well_shape_tract." ) self.config.inspections_source = self._detect_table(["inspections"]) self.config.violations_source = self._detect_table(["violations"]) if not self.config.inspections_source or not self.config.violations_source: raise ConfigError("Both inspections and violations tables must exist in the database.") self.data: Dict[str, pd.DataFrame] = {} self._initialize_data() # -------------------------------------------------------------------------------------- # Helper methods for metadata detection and SQL building # -------------------------------------------------------------------------------------- def _detect_table(self, candidates: List[str]) -> Optional[str]: for candidate in candidates: found = self._table_exists(candidate) if found: return found return None def _table_exists(self, table: str) -> Optional[str]: names_to_try = [] if "." in table: names_to_try.append(table) else: names_to_try.append(f"public.{table}") names_to_try.append(table) query = text("SELECT to_regclass(:name) IS NOT NULL AS exists") with self.config.engine.begin() as conn: for name in names_to_try: exists = conn.execute(query, {"name": name}).scalar() if exists: return name return None def _split_table_name(self, qualified: str) -> Dict[str, Optional[str]]: if "." in qualified: schema, table = qualified.split(".", 1) return {"schema": schema, "table": table} return {"schema": None, "table": qualified} def _get_columns(self, table: str) -> Dict[str, str]: pieces = self._split_table_name(table) sql = [ "SELECT column_name", "FROM information_schema.columns", "WHERE table_name = :table", ] params = {"table": pieces["table"]} if pieces["schema"]: sql.append("AND table_schema = :schema") params["schema"] = pieces["schema"] sql.append("ORDER BY ordinal_position") with self.config.engine.begin() as conn: rows = conn.execute(text(" ".join(sql)), params).fetchall() return {row[0].lower(): row[0] for row in rows} def _pick_column(self, columns: Dict[str, str], names: List[str]) -> Optional[str]: for name in names: if name.lower() in columns: return columns[name.lower()] return None def _execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame: try: df_chunks: List[pd.DataFrame] = [] for chunk in pd.read_sql( text(query), self.config.engine, params=params, chunksize=self.config.chunk_size, ): df_chunks.append(chunk) return pd.concat(df_chunks, ignore_index=True) if df_chunks else pd.DataFrame() except SQLAlchemyError as exc: logger.error("Query failed: %s", query, exc_info=True) raise DataLoadError(f"Failed executing query: {exc}") from exc # -------------------------------------------------------------------------------------- # Data loading # -------------------------------------------------------------------------------------- def _initialize_data(self) -> None: self.data["well_data"] = self._load_wells() self.data["inspections"] = self._load_inspections() self.data["violations"] = self._load_violations() if not self.data["well_data"].empty and ( not self.data["inspections"].empty or not self.data["violations"].empty ): self._create_performance_metrics() def _load_wells(self) -> pd.DataFrame: table = self.config.well_source columns = self._get_columns(table) alias = "w" api_norm_col = self._pick_column(columns, ["api_norm"]) if not api_norm_col: raise DataLoadError(f"{table} does not expose api_norm") select_parts = [f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}'] for target, candidates in self.WELL_COLUMN_MAP.items(): column = self._pick_column(columns, candidates) if column: select_parts.append(f'{alias}."{column}" AS {target}') query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias}' df = self._execute_query(query) df[self.ID_COLUMN] = df[self.ID_COLUMN].astype(str).str.strip() df = df[df[self.ID_COLUMN].notna()] df = df.drop_duplicates(subset=[self.ID_COLUMN]).reset_index(drop=True) logger.info("Loaded %s wells from %s", len(df), table) return df def _load_inspections(self) -> pd.DataFrame: table = self.config.inspections_source columns = self._get_columns(table) alias = "i" select_parts = [] base_candidates = [ "id", "district", "county", "inspection_date", "inspection_type", "operator_name", "field_name", "compliance", "file_date", "created_at", ] for column in base_candidates: picked = self._pick_column(columns, [column]) if picked: select_parts.append(f'{alias}."{picked}" AS {column}') api_norm_col = self._pick_column(columns, ["api_norm"]) if not api_norm_col: raise DataLoadError(f"{table} does not expose api_norm") select_parts.append(f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}') where_clause = f'WHERE {alias}."{api_norm_col}" IS NOT NULL' query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias} {where_clause}' df = self._execute_query(query) for col in ["inspection_date", "file_date", "created_at"]: if col in df.columns: df[col] = pd.to_datetime(df[col], errors="coerce") df = df[df[self.ID_COLUMN].notna()].copy() df = df[df[self.ID_COLUMN].notna()].copy() if "inspection_date" in df.columns: df = df.sort_values([self.ID_COLUMN, "inspection_date"]) df["days_since_last_inspection"] = ( df.groupby(self.ID_COLUMN)["inspection_date"].diff().dt.days ) logger.info("Loaded %s inspections from %s", len(df), table) return df.reset_index(drop=True) def _load_violations(self) -> pd.DataFrame: table = self.config.violations_source columns = self._get_columns(table) alias = "v" select_parts = [] row_id_candidates = ["id", "violation_id", "violationid", "objectid", "row_id"] base_candidates = [ "operator_name", "p5_operator_no", "district", "oil_lease_gas_well_id", "lease_fac_name", "well_no", "drilling_permit_no", "field_name", "violated_rule", "violated_rule_desc", "major_viol_ind", "compliant_on_reinsp", "last_enf_action", "last_enf_action_date", "violation_disc_date", "file_date", "created_at", ] for row_id in row_id_candidates: picked = self._pick_column(columns, [row_id]) if picked: select_parts.append(f'{alias}."{picked}" AS {row_id}') break for column in base_candidates: picked = self._pick_column(columns, [column]) if picked: select_parts.append(f'{alias}."{picked}" AS {column}') api_norm_col = self._pick_column(columns, ["api_norm"]) if not api_norm_col: raise DataLoadError(f"{table} does not expose api_norm") select_parts.append(f'{alias}."{api_norm_col}" AS {self.ID_COLUMN}') where_clause = f'WHERE {alias}."{api_norm_col}" IS NOT NULL' query = f'SELECT {", ".join(select_parts)} FROM {table} AS {alias} {where_clause}' df = self._execute_query(query) for col in ["violation_disc_date", "last_enf_action_date", "file_date", "created_at"]: if col in df.columns: df[col] = pd.to_datetime(df[col], errors="coerce") df = df[df[self.ID_COLUMN].notna()].reset_index(drop=True) df = df[df[self.ID_COLUMN].notna()].reset_index(drop=True) if not df.empty: row_id_col = next( (c for c in ["id", "violation_id", "violationid", "objectid", "row_id"] if c in df.columns), None, ) if row_id_col is None: df["violation_row_id"] = range(1, len(df) + 1) else: df = df.rename(columns={row_id_col: "violation_row_id"}) df["total_violations"] = df.groupby(self.ID_COLUMN)["violation_row_id"].transform("count") df["violation_number"] = df.groupby(self.ID_COLUMN).cumcount() + 1 logger.info("Loaded %s violations from %s", len(df), table) return df def _create_performance_metrics(self) -> None: insp = self.data.get("inspections", pd.DataFrame()).copy() viol = self.data.get("violations", pd.DataFrame()).copy() if insp.empty and viol.empty: return if not insp.empty: count_col = "id" if "id" in insp.columns else "inspection_date" insp_metrics = insp.groupby(self.ID_COLUMN).agg( total_inspections=(count_col, "count") ) if "compliance" in insp.columns: insp_metrics["compliance_rate"] = ( insp.groupby(self.ID_COLUMN)["compliance"] .apply(lambda x: (x == "Yes").mean() * 100) ) if "days_since_last_inspection" in insp.columns: insp_metrics["avg_days_between_inspections"] = ( insp.groupby(self.ID_COLUMN)["days_since_last_inspection"].mean() ) else: insp_metrics = pd.DataFrame() if not viol.empty: count_col = ( "violation_row_id" if "violation_row_id" in viol.columns else ("id" if "id" in viol.columns else self.ID_COLUMN) ) viol_metrics = viol.groupby(self.ID_COLUMN).agg( total_violations=(count_col, "count"), ) if "major_viol_ind" in viol.columns: viol_metrics["major_violations"] = ( viol.groupby(self.ID_COLUMN)["major_viol_ind"] .apply(lambda x: (x == "Y").sum()) ) if "compliant_on_reinsp" in viol.columns: viol_metrics["reinspection_compliance_rate"] = ( viol.groupby(self.ID_COLUMN)["compliant_on_reinsp"] .apply(lambda x: (x == "Y").mean() * 100) ) else: viol_metrics = pd.DataFrame() metrics = insp_metrics.join(viol_metrics, how="outer").fillna(0) metrics = metrics.reset_index() self.data["performance_metrics"] = metrics # -------------------------------------------------------------------------------------- # Analytic helpers # -------------------------------------------------------------------------------------- def analyze_inspection_patterns(self) -> Dict[str, Any]: insp_df = self.data.get("inspections", pd.DataFrame()) if insp_df.empty: return {} result: Dict[str, Any] = { "overall_statistics": { "total_inspections": int(len(insp_df)), "unique_wells_inspected": int(insp_df[self.ID_COLUMN].nunique()), } } if "compliance" in insp_df.columns: result["overall_statistics"]["overall_compliance_rate"] = ( (insp_df["compliance"] == "Yes").mean() * 100 ) if "days_since_last_inspection" in insp_df.columns: result["overall_statistics"]["avg_days_between_inspections"] = insp_df[ "days_since_last_inspection" ].mean() result["overall_statistics"]["median_days_between_inspections"] = insp_df[ "days_since_last_inspection" ].median() if "inspection_date" in insp_df.columns: insp_df["inspection_date"] = pd.to_datetime(insp_df["inspection_date"], errors="coerce") insp_df["year"] = insp_df["inspection_date"].dt.year result["temporal_patterns"] = { "inspections_by_year": insp_df.groupby("year").size().dropna().to_dict() } if "compliance" in insp_df.columns: result["temporal_patterns"]["compliance_by_year"] = ( insp_df.groupby("year")["compliance"] .apply(lambda x: (x == "Yes").mean() * 100) .dropna() .to_dict() ) if "district" in insp_df.columns: district_counts = insp_df.groupby("district").size().to_dict() district_compliance = {} if "compliance" in insp_df.columns: district_compliance = ( insp_df.groupby("district")["compliance"] .apply(lambda x: (x == "Yes").mean() * 100) .to_dict() ) result["district_performance"] = { "inspections_by_district": district_counts, "compliance_by_district": district_compliance, } return result def analyze_violations(self) -> Dict[str, Any]: viol_df = self.data.get("violations", pd.DataFrame()) if viol_df.empty: return {} result: Dict[str, Any] = { "overall_statistics": { "total_violations": int(len(viol_df)), "unique_wells_with_violations": int(viol_df[self.ID_COLUMN].nunique()), }, "violation_types": {}, "enforcement_effectiveness": {}, } if "major_viol_ind" in viol_df.columns: result["overall_statistics"]["major_violations"] = int( (viol_df["major_viol_ind"] == "Y").sum() ) if "compliant_on_reinsp" in viol_df.columns: result["overall_statistics"]["compliance_on_reinspection_rate"] = ( (viol_df["compliant_on_reinsp"] == "Y").mean() * 100 ) if "violated_rule" in viol_df.columns: result["violation_types"]["common_violations"] = ( viol_df["violated_rule"].value_counts().head(10).to_dict() ) if "major_viol_ind" in viol_df.columns: result["violation_types"]["major_violation_types"] = ( viol_df[viol_df["major_viol_ind"] == "Y"]["violated_rule"] .value_counts() .head(5) .to_dict() ) if {"violated_rule", "compliant_on_reinsp"} <= set(viol_df.columns): result["enforcement_effectiveness"]["resolution_rate_by_type"] = ( viol_df.groupby("violated_rule")["compliant_on_reinsp"] .apply(lambda x: (x == "Y").mean() * 100) .to_dict() ) return result def analyze_regulatory_chain(self) -> Dict[str, Any]: insp_df = self.data.get("inspections", pd.DataFrame()).copy() viol_df = self.data.get("violations", pd.DataFrame()).copy() if insp_df.empty or viol_df.empty: return {} if "inspection_date" not in insp_df.columns or "violation_disc_date" not in viol_df.columns: return {} insp_df["inspection_date"] = pd.to_datetime(insp_df["inspection_date"], errors="coerce") viol_df["violation_disc_date"] = pd.to_datetime(viol_df["violation_disc_date"], errors="coerce") viol_df["last_enf_action_date"] = pd.to_datetime( viol_df.get("last_enf_action_date"), errors="coerce" ) insp_df = insp_df.dropna(subset=[self.ID_COLUMN, "inspection_date"]) viol_df = viol_df.dropna(subset=[self.ID_COLUMN, "violation_disc_date"]) if insp_df.empty or viol_df.empty: return {} insp_sorted = ( insp_df.sort_values(["inspection_date", self.ID_COLUMN]) .reset_index(drop=True) ) viol_sorted = ( viol_df.sort_values(["violation_disc_date", self.ID_COLUMN]) .reset_index(drop=True) ) matched_df = pd.merge_asof( viol_sorted, insp_sorted, left_on="violation_disc_date", right_on="inspection_date", by=self.ID_COLUMN, direction="backward", suffixes=("_viol", "_insp"), ).dropna(subset=["inspection_date"]) if matched_df.empty: return {} total_inspections = len(insp_df) inspection_id_col = "id_insp" if "id_insp" in matched_df.columns else "inspection_date" inspections_with_violations = matched_df[inspection_id_col].nunique() violation_rate = (inspections_with_violations / total_inspections) * 100 if total_inspections else 0 time_spans = matched_df.dropna( subset=["inspection_date", "violation_disc_date", "last_enf_action_date"] ).copy() time_spans["insp_to_viol"] = ( time_spans["violation_disc_date"] - time_spans["inspection_date"] ).dt.days time_spans["viol_to_enforce"] = ( time_spans["last_enf_action_date"] - time_spans["violation_disc_date"] ).dt.days time_spans["total_span"] = time_spans["insp_to_viol"] + time_spans["viol_to_enforce"] enforcement_patterns = { "action_types": viol_df["last_enf_action"].value_counts().to_dict() if "last_enf_action" in viol_df.columns else {}, "enforcement_rate": ( matched_df["last_enf_action"].notna().sum() / len(matched_df) * 100 ), } if not time_spans.empty: enforcement_patterns["avg_days_to_enforcement"] = time_spans["viol_to_enforce"].mean() return { "summary": { "total_inspections": total_inspections, "violation_rate": violation_rate, "unique_wells_inspected": int(insp_df[self.ID_COLUMN].nunique()), }, "conversion_funnel": { "inspections_with_violations": inspections_with_violations, "violations_with_enforcement": int(matched_df["last_enf_action"].notna().sum()), }, "enforcement_patterns": enforcement_patterns, "time_spans": time_spans.describe().to_dict() if not time_spans.empty else {}, } def analyze_environmental_justice(self) -> Dict[str, Any]: well_df = self.data.get("well_data", pd.DataFrame()).copy() if well_df.empty or "census_tract_geoid" not in well_df.columns: return {} metrics = self.data.get("performance_metrics") if metrics is not None and not metrics.empty: well_df = well_df.merge(metrics, on=self.ID_COLUMN, how="left") tract_df = well_df.dropna(subset=["census_tract_geoid"]).copy() if tract_df.empty: return {} agg_map: Dict[str, str] = {self.ID_COLUMN: "count"} mean_cols = [ "ej_composite_score", "pct_minority", "pct_hispanic", "poverty_rate", "median_household_income", "avg_days_between_inspections", "reinspection_compliance_rate", "compliance_rate", ] for col in mean_cols: if col in tract_df.columns: agg_map[col] = "mean" if "total_inspections" in tract_df.columns: agg_map["total_inspections"] = "mean" if "total_violations" in tract_df.columns: agg_map["total_violations"] = "mean" if "major_violations" in tract_df.columns: agg_map["major_violations"] = "sum" tract_summary = ( tract_df.groupby("census_tract_geoid") .agg(agg_map) .rename(columns={self.ID_COLUMN: "wells_in_tract"}) .reset_index() ) rename_map = { "total_inspections": "avg_inspections", "total_violations": "avg_violations", "compliance_rate": "avg_compliance_rate", } tract_summary = tract_summary.rename(columns=rename_map) demographic_vars = [ col for col in [ "ej_composite_score", "pct_minority", "pct_hispanic", "poverty_rate", "median_household_income", ] if col in tract_summary.columns ] performance_vars = [ col for col in [ "avg_inspections", "avg_violations", "major_violations", "avg_compliance_rate", "avg_days_between_inspections", "reinspection_compliance_rate", "wells_in_tract", ] if col in tract_summary.columns ] correlations: Dict[str, Dict[str, float]] = {dv: {} for dv in demographic_vars} for d in demographic_vars: for p in performance_vars: correlations[d][p] = tract_summary[d].corr(tract_summary[p], method="spearman") def split_high_low(column: str) -> Dict[str, Dict[str, float]]: result: Dict[str, Dict[str, float]] = {} if column not in tract_summary.columns: return result median_value = tract_summary[column].median() high = tract_summary[tract_summary[column] > median_value] low = tract_summary[tract_summary[column] <= median_value] for metric in performance_vars: result[metric] = { "high": high[metric].mean() if not high.empty else float("nan"), "low": low[metric].mean() if not low.empty else float("nan"), } return result return { "summary": { "total_tracts": int(len(tract_summary)), "total_wells": int(tract_summary["wells_in_tract"].sum()), "avg_wells_per_tract": tract_summary["wells_in_tract"].mean(), "avg_ej_score": tract_summary.get("ej_composite_score", pd.Series(dtype=float)).mean(), }, "correlations": correlations, "high_vulnerability_vs_low": split_high_low("ej_composite_score"), "high_poverty_vs_low": split_high_low("poverty_rate"), } # -------------------------------------------------------------------------------------- # Public helpers # -------------------------------------------------------------------------------------- def get_analysis(self) -> Dict[str, Any]: return { "inspection_analysis": self.analyze_inspection_patterns(), "violation_analysis": self.analyze_violations(), "regulatory_chain": self.analyze_regulatory_chain(), "environmental_justice": self.analyze_environmental_justice(), } @staticmethod def _format_stat(stat_value: Any) -> str: if stat_value is None: return "n/a" if isinstance(stat_value, (int, float)): if pd.isna(stat_value): return "n/a" return f"{stat_value:,.2f}" return str(stat_value) def print_analysis(self) -> None: analysis = self.get_analysis() def print_block(title: str, stats: Dict[str, Any]) -> None: if not stats: return print(f"\n{title}") for stat_key, stat_value in stats.items(): print(f" {stat_key.replace('_', ' ').title()}: {self._format_stat(stat_value)}") print_block("INSPECTION ANALYSIS", analysis.get("inspection_analysis", {}).get("overall_statistics", {})) print_block("VIOLATION ANALYSIS", analysis.get("violation_analysis", {}).get("overall_statistics", {})) print_block("REGULATORY CHAIN", analysis.get("regulatory_chain", {}).get("summary", {})) print_block("ENVIRONMENTAL JUSTICE", analysis.get("environmental_justice", {}).get("summary", {})) violation_types = analysis.get("violation_analysis", {}).get("violation_types", {}) if violation_types: print("\nTop Violations:") for rule, count in violation_types.get("common_violations", {}).items(): print(f" {rule}: {self._format_stat(count)}") def get_summary_stats(self) -> Dict[str, Any]: stats: Dict[str, Any] = {} wells = self.data.get("well_data", pd.DataFrame()) if not wells.empty: stats["total_wells"] = len(wells) stats["unique_census_tracts"] = wells["census_tract_geoid"].nunique( dropna=True ) if "census_tract_geoid" in wells.columns else None insp = self.data.get("inspections", pd.DataFrame()) if not insp.empty: stats["total_inspections"] = len(insp) viol = self.data.get("violations", pd.DataFrame()) if not viol.empty: stats["total_violations"] = len(viol) return stats def export_analysis(self, path: Path | str) -> None: output_path = Path(path) output_path.parent.mkdir(parents=True, exist_ok=True) analysis = self.get_analysis() output_path.write_text(json.dumps(analysis, indent=2, default=str), encoding="utf-8") logger.info("Analysis exported to %s", output_path) if __name__ == "__main__": try: analyzer = WellAnalyzer() print("Summary Stats:") for key, value in analyzer.get_summary_stats().items(): print(f" {key.replace('_', ' ').title()}: {value}") analyzer.print_analysis() analyzer.export_analysis(Path("analysis_output.json")) except WellAnalyzerError as exc: logger.error("Well Analyzer failed: %s", exc, exc_info=True)