Files
california-equity-git/cci_analyzer.py
2025-04-09 21:40:29 -07:00

1837 lines
86 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Tuple, Union
from datetime import datetime, timedelta
import os
import json
import logging
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
import geopandas as gpd
from sqlalchemy import create_engine, text, Engine
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
import statsmodels.formula.api as smf
from scipy.stats import pearsonr, spearmanr
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Suppress pandas warnings
warnings.filterwarnings('ignore')
# Custom exceptions
class CCIAnalyzerError(Exception):
"""Base exception class for CCIAnalyzer."""
pass
class DataLoadError(CCIAnalyzerError):
"""Raised when data loading fails."""
pass
class ConfigError(CCIAnalyzerError):
"""Raised when configuration is invalid."""
pass
class AnalysisError(CCIAnalyzerError):
"""Raised when analysis fails."""
pass
@dataclass
class ProjectData:
"""Data class for CCI project information."""
project_id: str
reporting_cycle: int
agency_name: str
program_name: str
sub_program_name: Optional[str]
record_type: str
census_tract: Optional[int]
lat_long: Optional[str]
senate_district: Optional[int]
assembly_district: Optional[int]
county: str
total_project_cost: float
total_ggrf_funding: float
project_life_years: int
total_project_ghg_reductions: float
annual_project_ghg_reductions: float
fiscal_year_funding: str
is_benefit_dac: bool
total_ggrf_dac_funding: Optional[float]
funding_benefiting_dac: Optional[float]
date_operational: Optional[datetime]
project_completion_date: Optional[datetime]
is_ab1550_buffer_region: Optional[bool]
is_benefit_dac1550: Optional[bool]
is_low_income_communities: Optional[bool]
@dataclass
class Config:
"""Configuration settings for CCIAnalyzer."""
data_path: Path
output_path: Path = Path("./output")
shapefile_path: Optional[Path] = None
ej_data_path: Optional[Path] = None
cache_dir: Path = Path("./cache")
chunk_size: int = 10000
start_year: int = 2015
end_year: int = 2024
temporal_breakpoint: int = 2020 # Year to use as pre/post analysis breakpoint
class CCIAnalyzer:
"""
Analyzes California Climate Investments (CCI) program data with focus on:
1. Inter-agency collaboration and its impacts on GHG reduction and equity
2. Temporal trends in funding allocation and collaborative outputs
3. Spatial patterns in program implementation
Attributes:
config (Config): Configuration settings
data (Dict): Dictionary containing loaded datasets
"""
def __init__(self,
data_path: Union[str, Path],
output_path: Optional[Union[str, Path]] = None,
shapefile_path: Optional[Union[str, Path]] = None,
ej_data_path: Optional[Union[str, Path]] = None,
start_year: int = 2015,
end_year: int = 2024,
temporal_breakpoint: int = 2020,
chunk_size: int = 10000):
"""
Initialize CCIAnalyzer with paths to data sources and configuration.
Args:
data_path: Path to the main CCI program data CSV file
output_path: Directory to save analysis outputs
shapefile_path: Path to geographic shapefiles (optional)
ej_data_path: Path to environmental justice data (optional)
start_year: Starting year for analysis
end_year: Ending year for analysis
temporal_breakpoint: Year to use as pre/post analysis breakpoint
chunk_size: Size of chunks for reading large datasets
"""
try:
# Convert string paths to Path objects if needed
data_path = Path(data_path) if isinstance(data_path, str) else data_path
if output_path:
output_path = Path(output_path) if isinstance(output_path, str) else output_path
else:
output_path = Path("./output")
if shapefile_path:
shapefile_path = Path(shapefile_path) if isinstance(shapefile_path, str) else shapefile_path
if ej_data_path:
ej_data_path = Path(ej_data_path) if isinstance(ej_data_path, str) else ej_data_path
# Initialize configuration
self.config = Config(
data_path=data_path,
output_path=output_path,
shapefile_path=shapefile_path,
ej_data_path=ej_data_path,
cache_dir=Path('./cache'),
chunk_size=chunk_size,
start_year=start_year,
end_year=end_year,
temporal_breakpoint=temporal_breakpoint
)
# Create output and cache directories
self.config.output_path.mkdir(parents=True, exist_ok=True)
self.config.cache_dir.mkdir(parents=True, exist_ok=True)
# Initialize data dictionary
self.data: Dict[str, pd.DataFrame] = {}
self.geo_data: Dict[str, gpd.GeoDataFrame] = {}
logger.info("Initializing CCIAnalyzer...")
self._initialize_data()
except Exception as e:
logger.error(f"Initialization error: {e}", exc_info=True)
raise ConfigError(f"Failed to initialize CCIAnalyzer: {str(e)}")
def _initialize_data(self):
"""Load and process required data."""
try:
# Load core dataset
self.data['cci_projects'] = self._load_cci_data()
# Process the data to create derived datasets
self._process_cci_data()
# Load geographic data if available
if self.config.shapefile_path:
self._load_geographic_data()
# Load environmental justice data if available
if self.config.ej_data_path:
self._load_ej_data()
# Create analysis-ready datasets
self._create_analysis_datasets()
logger.info("Data initialization complete")
except Exception as e:
logger.error(f"Error during data initialization: {e}", exc_info=True)
raise DataLoadError(f"Failed to initialize data: {str(e)}")
def _load_cci_data(self) -> pd.DataFrame:
"""
Load CCI program data from CSV with proper handling of data types and chunking for large files.
Returns:
DataFrame containing CCI project data
"""
try:
logger.info(f"Loading CCI data from {self.config.data_path}...")
# Define data types for efficient loading
dtypes = {
'Project IDNumber': str,
'Reporting Cycle Name': 'Int64',
'Agency Name': str,
'Program Name': str,
'Sub Program Name': str,
'Record Type': str,
'Census Tract': 'Int64',
'Lat Long': str,
'Senate District': 'Int64',
'Assembly District': 'Int64',
'County': str,
'Total Project Cost': 'float64',
'Total Program GGRFFunding': 'float64',
'Project Life Years': 'Int64',
'Total Project GHGReductions': 'float64',
'Annual Project GHGReductions': 'float64',
'Project Count': 'Int64',
'Fiscal Year Funding Project': str,
'Is Benefit Disadvantaged Communities': bool,
'Disadvantaged Community Criteria': str,
'Total GGRFDisadvantaged Community Funding': 'float64',
'Funding Benefiting Disadvantaged Communities': 'float64',
'Funding Within Disadvantage Communities': 'float64',
'Date Operational': str,
'Project Completion Date': str,
'Is AB1550Buffer Region': bool,
'Is Benefit DAC1550Communities': bool,
'Is Low Income Communities': bool
}
# Parse dates
date_cols = ['Date Operational', 'Project Completion Date']
# Read in chunks for large files
df_chunks = []
# Use chunking to handle large files
chunk_iter = pd.read_csv(
self.config.data_path,
dtype=dtypes,
parse_dates=date_cols,
chunksize=self.config.chunk_size
)
for chunk in chunk_iter:
df_chunks.append(chunk)
# Combine chunks
df = pd.concat(df_chunks, ignore_index=True)
# Basic data cleaning
df = self._clean_cci_data(df)
logger.info(f"Loaded {len(df)} CCI projects")
return df
except Exception as e:
logger.error(f"Error loading CCI data: {e}", exc_info=True)
raise DataLoadError(f"Failed to load CCI data: {str(e)}")
def _clean_cci_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Clean and prepare CCI data.
Args:
df: Raw CCI data DataFrame
Returns:
Cleaned DataFrame
"""
try:
# Rename columns for consistency and readability
column_mapping = {
'Project IDNumber': 'project_id',
'Reporting Cycle Name': 'reporting_cycle',
'Agency Name': 'agency_name',
'Program Name': 'program_name',
'Sub Program Name': 'sub_program_name',
'Record Type': 'record_type',
'Census Tract': 'census_tract',
'Lat Long': 'lat_long',
'Senate District': 'senate_district',
'Assembly District': 'assembly_district',
'County': 'county',
'Total Project Cost': 'total_project_cost',
'Total Program GGRFFunding': 'total_ggrf_funding',
'Project Life Years': 'project_life_years',
'Total Project GHGReductions': 'total_project_ghg_reductions',
'Annual Project GHGReductions': 'annual_project_ghg_reductions',
'Project Count': 'project_count',
'Fiscal Year Funding Project': 'fiscal_year_funding',
'Is Benefit Disadvantaged Communities': 'is_benefit_dac',
'Disadvantaged Community Criteria': 'dac_criteria',
'Total GGRFDisadvantaged Community Funding': 'total_ggrf_dac_funding',
'Funding Benefiting Disadvantaged Communities': 'funding_benefiting_dac',
'Funding Within Disadvantage Communities': 'funding_within_dac',
'Date Operational': 'date_operational',
'Project Completion Date': 'project_completion_date',
'Is AB1550Buffer Region': 'is_ab1550_buffer_region',
'Is Benefit DAC1550Communities': 'is_benefit_dac1550',
'Is Low Income Communities': 'is_low_income_communities'
}
# Rename columns
df = df.rename(columns=column_mapping)
# Extract latitude and longitude from combined field
if 'lat_long' in df.columns:
df['latitude'] = df['lat_long'].str.split(',').str[1].str.strip().astype(float)
df['longitude'] = df['lat_long'].str.split(',').str[0].str.strip().astype(float)
# Convert fiscal year to calendar year
if 'fiscal_year_funding' in df.columns:
# Extract the starting year from fiscal year strings like "2019-20"
df['funding_year'] = df['fiscal_year_funding'].str.split('-').str[0].astype('Int64')
# Calculate GHG efficiency ($ spent per ton of GHG reduced)
df['ghg_efficiency'] = np.where(
df['total_project_ghg_reductions'] > 0,
df['total_ggrf_funding'] / df['total_project_ghg_reductions'],
np.nan
)
# Calculate DAC benefit percentage
df['dac_benefit_percentage'] = np.where(
df['total_ggrf_funding'] > 0,
100 * df['funding_benefiting_dac'] / df['total_ggrf_funding'],
0
)
# Create pre/post temporal breakpoint indicator
if 'funding_year' in df.columns:
df['post_breakpoint'] = df['funding_year'] >= self.config.temporal_breakpoint
# Drop rows with missing critical values
df = df.dropna(subset=['project_id', 'agency_name', 'program_name'])
return df
except Exception as e:
logger.error(f"Error cleaning CCI data: {e}", exc_info=True)
raise DataLoadError(f"Failed to clean CCI data: {str(e)}")
def _process_cci_data(self):
"""Process the CCI data to create derived analytical datasets."""
try:
if 'cci_projects' not in self.data or self.data['cci_projects'].empty:
raise DataLoadError("CCI project data not loaded")
df = self.data['cci_projects']
# Create agency collaboration dataset
self._create_agency_collaboration_data(df)
# Create regional dataset
self._create_regional_data(df)
# Create temporal dataset
self._create_temporal_data(df)
except Exception as e:
logger.error(f"Error processing CCI data: {e}", exc_info=True)
raise DataLoadError(f"Failed to process CCI data: {str(e)}")
def _create_agency_collaboration_data(self, df: pd.DataFrame):
"""
Create agency collaboration metrics from CCI data.
Args:
df: CCI projects DataFrame
"""
try:
# Group projects by program to identify unique agencies per program
program_agencies = df.groupby('program_name')['agency_name'].unique().apply(list)
# Calculate number of agencies per program
program_agency_counts = program_agencies.apply(len)
# Create program-level collaboration metrics
program_collab = pd.DataFrame({
'program_name': program_agency_counts.index,
'num_partner_agencies': program_agency_counts.values
})
# Add collaboration size categories
program_collab['collab_size_category'] = pd.cut(
program_collab['num_partner_agencies'],
bins=[0, 1, 3, 5, float('inf')],
labels=['Single agency', 'Small collab (2-3)', 'Medium collab (4-5)', 'Large collab (6+)']
)
# Store the result
self.data['agency_collaboration'] = program_collab
# Add collaboration metrics back to the main dataset
program_collab_dict = dict(zip(program_collab['program_name'], program_collab['num_partner_agencies']))
df['num_partner_agencies'] = df['program_name'].map(program_collab_dict)
# Update the main dataset
self.data['cci_projects'] = df
logger.info(f"Created agency collaboration data for {len(program_collab)} programs")
except Exception as e:
logger.error(f"Error creating agency collaboration data: {e}", exc_info=True)
raise DataLoadError(f"Failed to create agency collaboration data: {str(e)}")
def _create_regional_data(self, df: pd.DataFrame):
"""
Create regional dataset for spatial analysis.
Args:
df: CCI projects DataFrame
"""
try:
# Define regions based on counties
# California regions mapping (simplified example)
ca_regions = {
'Bay Area': ['Alameda', 'Contra Costa', 'Marin', 'Napa', 'San Francisco',
'San Mateo', 'Santa Clara', 'Solano', 'Sonoma'],
'Sacramento Region': ['Sacramento', 'Yolo', 'Placer', 'El Dorado', 'Sutter', 'Yuba'],
'San Joaquin Valley': ['Fresno', 'Kern', 'Kings', 'Madera', 'Merced',
'San Joaquin', 'Stanislaus', 'Tulare'],
'Southern California': ['Los Angeles', 'Orange', 'Riverside', 'San Bernardino',
'Ventura', 'Imperial', 'San Diego'],
'Central Coast': ['Monterey', 'San Benito', 'San Luis Obispo', 'Santa Barbara', 'Santa Cruz'],
'Northern California': ['Butte', 'Colusa', 'Del Norte', 'Glenn', 'Humboldt', 'Lake',
'Lassen', 'Mendocino', 'Modoc', 'Nevada', 'Plumas', 'Shasta',
'Sierra', 'Siskiyou', 'Tehama', 'Trinity']
}
# Create reverse mapping: county -> region
county_to_region = {}
for region, counties in ca_regions.items():
for county in counties:
county_to_region[county] = region
# Apply region mapping
df['region'] = df['county'].map(county_to_region)
# Handle multi-county projects
multi_county_mask = df['county'].str.contains(',', na=False)
df.loc[multi_county_mask, 'region'] = 'Multi-Region'
# Fill missing regions
df['region'] = df['region'].fillna('Other/Unknown')
# Create regional summary
regional_summary = df.groupby('region').agg({
'project_id': 'count',
'total_ggrf_funding': 'sum',
'total_project_ghg_reductions': 'sum',
'funding_benefiting_dac': 'sum',
'num_partner_agencies': 'mean',
'ghg_efficiency': 'mean',
'dac_benefit_percentage': 'mean'
}).reset_index()
regional_summary = regional_summary.rename(columns={
'project_id': 'num_projects',
'num_partner_agencies': 'avg_partners'
})
# Calculate total GHG reduction and DAC benefit percentages by region
regional_summary['pct_total_funding'] = 100 * regional_summary['total_ggrf_funding'] / regional_summary['total_ggrf_funding'].sum()
regional_summary['pct_total_ghg'] = 100 * regional_summary['total_project_ghg_reductions'] / regional_summary['total_project_ghg_reductions'].sum()
regional_summary['pct_dac_funding'] = 100 * regional_summary['funding_benefiting_dac'] / regional_summary['total_ggrf_funding']
# Store the results
self.data['cci_projects'] = df # Update with region info
self.data['regional_summary'] = regional_summary
logger.info(f"Created regional data with {len(regional_summary)} regions")
except Exception as e:
logger.error(f"Error creating regional data: {e}", exc_info=True)
raise DataLoadError(f"Failed to create regional data: {str(e)}")
def _create_temporal_data(self, df: pd.DataFrame):
"""
Create temporal dataset for time series analysis.
Args:
df: CCI projects DataFrame
"""
try:
# Ensure we have a funding year column
if 'funding_year' not in df.columns:
raise DataLoadError("Funding year column not available for temporal analysis")
# Create year-level summaries
yearly_summary = df.groupby('funding_year').agg({
'project_id': 'count',
'total_ggrf_funding': 'sum',
'total_project_ghg_reductions': 'sum',
'funding_benefiting_dac': 'sum',
'num_partner_agencies': 'mean',
'ghg_efficiency': 'mean',
'dac_benefit_percentage': 'mean'
}).reset_index()
yearly_summary = yearly_summary.rename(columns={
'project_id': 'num_projects',
'num_partner_agencies': 'avg_partners'
})
# Calculate percentages and rates
yearly_summary['pct_dac_funding'] = 100 * yearly_summary['funding_benefiting_dac'] / yearly_summary['total_ggrf_funding']
yearly_summary['avg_project_funding'] = yearly_summary['total_ggrf_funding'] / yearly_summary['num_projects']
# Create pre/post breakpoint summaries
breakpoint_summary = df.groupby('post_breakpoint').agg({
'project_id': 'count',
'total_ggrf_funding': 'sum',
'total_project_ghg_reductions': 'sum',
'funding_benefiting_dac': 'sum',
'num_partner_agencies': 'mean',
'ghg_efficiency': 'mean',
'dac_benefit_percentage': 'mean',
'total_ggrf_funding': ['mean', 'median']
}).reset_index()
# Flatten multi-level columns after aggregation
breakpoint_summary.columns = ['_'.join(col).strip('_') for col in breakpoint_summary.columns.values]
# Store the results
self.data['yearly_summary'] = yearly_summary
self.data['breakpoint_summary'] = breakpoint_summary
logger.info(f"Created temporal data with {len(yearly_summary)} yearly records")
except Exception as e:
logger.error(f"Error creating temporal data: {e}", exc_info=True)
raise DataLoadError(f"Failed to create temporal data: {str(e)}")
def _load_geographic_data(self):
"""Load geographic shapefiles if available."""
try:
if not self.config.shapefile_path or not self.config.shapefile_path.exists():
logger.warning("Shapefile path not provided or does not exist")
return
# Load county boundaries
county_shapefile = self.config.shapefile_path / "counties.shp"
if county_shapefile.exists():
self.geo_data['counties'] = gpd.read_file(county_shapefile)
logger.info(f"Loaded {len(self.geo_data['counties'])} county boundaries")
# Load census tract boundaries
tract_shapefile = self.config.shapefile_path / "census_tracts.shp"
if tract_shapefile.exists():
self.geo_data['census_tracts'] = gpd.read_file(tract_shapefile)
logger.info(f"Loaded {len(self.geo_data['census_tracts'])} census tract boundaries")
except Exception as e:
logger.error(f"Error loading geographic data: {e}", exc_info=True)
logger.warning("Continuing without geographic data")
def _load_ej_data(self):
"""Load environmental justice data if available."""
try:
if not self.config.ej_data_path or not self.config.ej_data_path.exists():
logger.warning("Environmental justice data path not provided or does not exist")
return
# Load EJ data by census tract
ej_data_file = self.config.ej_data_path / "ej_indicators.csv"
if ej_data_file.exists():
ej_data = pd.read_csv(ej_data_file)
self.data['ej_indicators'] = ej_data
logger.info(f"Loaded environmental justice data for {len(ej_data)} census tracts")
except Exception as e:
logger.error(f"Error loading environmental justice data: {e}", exc_info=True)
logger.warning("Continuing without environmental justice data")
def _create_analysis_datasets(self):
"""Create integrated datasets for analysis."""
try:
# Create collaboration outcomes dataset
self._create_collaboration_outcomes_dataset()
# Create regional collaboration dataset
self._create_regional_collaboration_dataset()
# Create temporal trends dataset
self._create_temporal_trends_dataset()
except Exception as e:
logger.error(f"Error creating analysis datasets: {e}", exc_info=True)
raise DataLoadError(f"Failed to create analysis datasets: {str(e)}")
def _create_collaboration_outcomes_dataset(self):
"""Create dataset linking collaboration metrics to outcomes."""
try:
if 'cci_projects' not in self.data:
raise DataLoadError("CCI project data not loaded")
df = self.data['cci_projects']
# Group by program and calculate metrics
collab_outcomes = df.groupby(['program_name', 'num_partner_agencies']).agg({
'total_ggrf_funding': 'sum',
'total_project_ghg_reductions': 'sum',
'funding_benefiting_dac': 'sum',
'project_id': 'count',
'ghg_efficiency': 'mean',
'dac_benefit_percentage': 'mean'
}).reset_index()
collab_outcomes = collab_outcomes.rename(columns={
'project_id': 'num_projects'
})
# Calculate percentages
collab_outcomes['pct_dac_funding'] = 100 * collab_outcomes['funding_benefiting_dac'] / collab_outcomes['total_ggrf_funding']
# Add collaboration size categories
collab_outcomes['collab_size_category'] = pd.cut(
collab_outcomes['num_partner_agencies'],
bins=[0, 1, 3, 5, float('inf')],
labels=['Single agency', 'Small collab (2-3)', 'Medium collab (4-5)', 'Large collab (6+)']
)
# Store the result
self.data['collaboration_outcomes'] = collab_outcomes
logger.info(f"Created collaboration outcomes dataset with {len(collab_outcomes)} records")
except Exception as e:
logger.error(f"Error creating collaboration outcomes dataset: {e}", exc_info=True)
raise DataLoadError(f"Failed to create collaboration outcomes dataset: {str(e)}")
def _create_regional_collaboration_dataset(self):
"""Create dataset linking regional and collaboration patterns."""
try:
if 'cci_projects' not in self.data:
raise DataLoadError("CCI project data not loaded")
df = self.data['cci_projects']
# Group by region and collaboration size
regional_collab = df.groupby(['region', 'num_partner_agencies']).agg({
'total_ggrf_funding': 'sum',
'total_project_ghg_reductions': 'sum',
'funding_benefiting_dac': 'sum',
'project_id': 'count',
'ghg_efficiency': 'mean',
'dac_benefit_percentage': 'mean'
}).reset_index()
regional_collab = regional_collab.rename(columns={
'project_id': 'num_projects'
})
# Calculate percentages
regional_collab['pct_dac_funding'] = 100 * regional_collab['funding_benefiting_dac'] / regional_collab['total_ggrf_funding']
# Add collaboration size categories
regional_collab['collab_size_category'] = pd.cut(
regional_collab['num_partner_agencies'],
bins=[0, 1, 3, 5, float('inf')],
labels=['Single agency', 'Small collab (2-3)', 'Medium collab (4-5)', 'Large collab (6+)']
)
# Store the result
self.data['regional_collaboration'] = regional_collab
logger.info(f"Created regional collaboration dataset with {len(regional_collab)} records")
except Exception as e:
logger.error(f"Error creating regional collaboration dataset: {e}", exc_info=True)
raise DataLoadError(f"Failed to create regional collaboration dataset: {str(e)}")
def _create_temporal_trends_dataset(self):
"""Create dataset for temporal trend analysis."""
try:
if 'cci_projects' not in self.data:
raise DataLoadError("CCI project data not loaded")
df = self.data['cci_projects']
# Group by year and collaboration size
temporal_collab = df.groupby(['funding_year', 'num_partner_agencies']).agg({
'total_ggrf_funding': 'sum',
'total_project_ghg_reductions': 'sum',
'funding_benefiting_dac': 'sum',
'project_id': 'count',
'ghg_efficiency': 'mean',
'dac_benefit_percentage': 'mean'
}).reset_index()
temporal_collab = temporal_collab.rename(columns={
'project_id': 'num_projects'
})
# Calculate percentages
temporal_collab['pct_dac_funding'] = 100 * temporal_collab['funding_benefiting_dac'] / temporal_collab['total_ggrf_funding']
# Add collaboration size categories
temporal_collab['collab_size_category'] = pd.cut(
temporal_collab['num_partner_agencies'],
bins=[0, 1, 3, 5, float('inf')],
labels=['Single agency', 'Small collab (2-3)', 'Medium collab (4-5)', 'Large collab (6+)']
)
# Store the result
self.data['temporal_collaboration'] = temporal_collab
logger.info(f"Created temporal collaboration dataset with {len(temporal_collab)} records")
except Exception as e:
logger.error(f"Error creating temporal trends dataset: {e}", exc_info=True)
raise DataLoadError(f"Failed to create temporal trends dataset: {str(e)}")
# Analysis methods for Research Question 1: Collaboration structure and scale
def analyze_collaboration_structure(self) -> Dict[str, Any]:
"""
Analyze how the structure and scale of inter-agency collaboration shapes outcomes.
Returns:
Dictionary containing collaboration structure analysis results
"""
try:
if 'collaboration_outcomes' not in self.data:
raise AnalysisError("Collaboration outcomes dataset not available")
df = self.data['collaboration_outcomes']
# Calculate correlations between collaboration size and outcomes
corr_metrics = ['num_partner_agencies', 'ghg_efficiency', 'pct_dac_funding']
correlations = df[corr_metrics].corr(method='spearman')
# Aggregate by collaboration size category
collab_size_analysis = df.groupby('collab_size_category').agg({
'total_ggrf_funding': 'sum',
'total_project_ghg_reductions': 'sum',
'funding_benefiting_dac': 'sum',
'num_projects': 'sum',
'ghg_efficiency': 'mean',
'dac_benefit_percentage': 'mean',
'program_name': 'nunique'
}).reset_index()
# Calculate percentages and metrics
collab_size_analysis['pct_total_funding'] = 100 * collab_size_analysis['total_ggrf_funding'] / collab_size_analysis['total_ggrf_funding'].sum()
collab_size_analysis['pct_total_ghg'] = 100 * collab_size_analysis['total_project_ghg_reductions'] / collab_size_analysis['total_project_ghg_reductions'].sum()
collab_size_analysis['pct_dac_funding'] = 100 * collab_size_analysis['funding_benefiting_dac'] / collab_size_analysis['total_ggrf_funding']
# Regression analysis of collaboration size on outcomes
X = sm.add_constant(df['num_partner_agencies'])
# GHG efficiency regression
y_ghg = df['ghg_efficiency']
model_ghg = sm.OLS(y_ghg, X).fit()
# DAC benefit regression
y_dac = df['pct_dac_funding']
model_dac = sm.OLS(y_dac, X).fit()
# Prepare results
results = {
'correlations': correlations.to_dict(),
'collaboration_size_summary': collab_size_analysis.to_dict('records'),
'ghg_efficiency_regression': {
'coefficients': model_ghg.params.to_dict(),
'p_values': model_ghg.pvalues.to_dict(),
'r_squared': model_ghg.rsquared,
'adj_r_squared': model_ghg.rsquared_adj
},
'dac_benefit_regression': {
'coefficients': model_dac.params.to_dict(),
'p_values': model_dac.pvalues.to_dict(),
'r_squared': model_dac.rsquared,
'adj_r_squared': model_dac.rsquared_adj
}
}
logger.info("Completed collaboration structure analysis")
return results
except Exception as e:
logger.error(f"Error analyzing collaboration structure: {e}", exc_info=True)
raise AnalysisError(f"Failed to analyze collaboration structure: {str(e)}")
def analyze_regional_patterns(self) -> Dict[str, Any]:
"""
Analyze regional patterns in collaboration and outcomes.
Returns:
Dictionary containing regional analysis results
"""
try:
if 'regional_collaboration' not in self.data:
raise AnalysisError("Regional collaboration dataset not available")
df = self.data['regional_collaboration']
# Regional collaboration summary
regional_summary = df.groupby('region').agg({
'num_partner_agencies': 'mean',
'total_ggrf_funding': 'sum',
'total_project_ghg_reductions': 'sum',
'funding_benefiting_dac': 'sum',
'ghg_efficiency': 'mean',
'pct_dac_funding': 'mean',
'num_projects': 'sum'
}).reset_index()
# Calculate percentages
regional_summary['pct_total_funding'] = 100 * regional_summary['total_ggrf_funding'] / regional_summary['total_ggrf_funding'].sum()
regional_summary['pct_total_ghg'] = 100 * regional_summary['total_project_ghg_reductions'] / regional_summary['total_project_ghg_reductions'].sum()
# Analyze Multi-Region projects separately
multi_region = df[df['region'] == 'Multi-Region']
single_region = df[df['region'] != 'Multi-Region']
multi_vs_single = {
'multi_region': {
'avg_partners': multi_region['num_partner_agencies'].mean(),
'avg_ghg_efficiency': multi_region['ghg_efficiency'].mean(),
'avg_dac_benefit': multi_region['pct_dac_funding'].mean(),
'total_funding': multi_region['total_ggrf_funding'].sum(),
'total_ghg_reductions': multi_region['total_project_ghg_reductions'].sum(),
'num_projects': multi_region['num_projects'].sum()
},
'single_region': {
'avg_partners': single_region['num_partner_agencies'].mean(),
'avg_ghg_efficiency': single_region['ghg_efficiency'].mean(),
'avg_dac_benefit': single_region['pct_dac_funding'].mean(),
'total_funding': single_region['total_ggrf_funding'].sum(),
'total_ghg_reductions': single_region['total_project_ghg_reductions'].sum(),
'num_projects': single_region['num_projects'].sum()
}
}
# Per-region regressions of collaboration size on outcomes
region_models = {}
for region in df['region'].unique():
region_df = df[df['region'] == region]
if len(region_df) > 5: # Only run regression if enough data points
try:
X = sm.add_constant(region_df['num_partner_agencies'])
# GHG efficiency regression
y_ghg = region_df['ghg_efficiency']
if not y_ghg.isna().all():
model_ghg = sm.OLS(y_ghg, X).fit()
# DAC benefit regression
y_dac = region_df['pct_dac_funding']
model_dac = sm.OLS(y_dac, X).fit()
region_models[region] = {
'ghg_efficiency': {
'coefficient': model_ghg.params['num_partner_agencies'],
'p_value': model_ghg.pvalues['num_partner_agencies'],
'r_squared': model_ghg.rsquared
},
'dac_benefit': {
'coefficient': model_dac.params['num_partner_agencies'],
'p_value': model_dac.pvalues['num_partner_agencies'],
'r_squared': model_dac.rsquared
}
}
except Exception as e:
logger.warning(f"Could not run regression for region {region}: {e}")
# Prepare results
results = {
'regional_summary': regional_summary.to_dict('records'),
'multi_vs_single_region': multi_vs_single,
'region_models': region_models
}
logger.info("Completed regional patterns analysis")
return results
except Exception as e:
logger.error(f"Error analyzing regional patterns: {e}", exc_info=True)
raise AnalysisError(f"Failed to analyze regional patterns: {str(e)}")
# Analysis methods for Research Question 2: Temporal trends
def analyze_temporal_trends(self) -> Dict[str, Any]:
"""
Analyze temporal trends in funding, collaboration, and outcomes.
Returns:
Dictionary containing temporal trends analysis results
"""
try:
if 'temporal_collaboration' not in self.data or 'yearly_summary' not in self.data:
raise AnalysisError("Temporal datasets not available")
temp_df = self.data['temporal_collaboration']
yearly_df = self.data['yearly_summary']
# Analyze yearly trends
yearly_trends = yearly_df.to_dict('records')
# Calculate year-over-year changes
yearly_df['funding_change'] = yearly_df['total_ggrf_funding'].pct_change() * 100
yearly_df['ghg_reduction_change'] = yearly_df['total_project_ghg_reductions'].pct_change() * 100
yearly_df['dac_funding_change'] = yearly_df['funding_benefiting_dac'].pct_change() * 100
yearly_df['partners_change'] = yearly_df['avg_partners'].pct_change() * 100
yoy_changes = yearly_df[['funding_year', 'funding_change', 'ghg_reduction_change',
'dac_funding_change', 'partners_change']].dropna()
# Pre/post breakpoint analysis
if 'breakpoint_summary' in self.data:
breakpoint_df = self.data['breakpoint_summary']
breakpoint_analysis = breakpoint_df.to_dict('records')
# Calculate percentage changes between pre and post periods
pre_row = breakpoint_df[breakpoint_df['post_breakpoint'] == 0].iloc[0] if len(breakpoint_df[breakpoint_df['post_breakpoint'] == 0]) > 0 else None
post_row = breakpoint_df[breakpoint_df['post_breakpoint'] == 1].iloc[0] if len(breakpoint_df[breakpoint_df['post_breakpoint'] == 1]) > 0 else None
if pre_row is not None and post_row is not None:
pct_changes = {
'avg_project_funding_change': ((post_row['total_ggrf_funding_mean'] / pre_row['total_ggrf_funding_mean']) - 1) * 100,
'avg_partners_change': ((post_row['num_partner_agencies_mean'] / pre_row['num_partner_agencies_mean']) - 1) * 100,
'ghg_efficiency_change': ((post_row['ghg_efficiency_mean'] / pre_row['ghg_efficiency_mean']) - 1) * 100,
'dac_benefit_change': ((post_row['dac_benefit_percentage_mean'] / pre_row['dac_benefit_percentage_mean']) - 1) * 100
}
else:
pct_changes = {}
else:
breakpoint_analysis = {}
pct_changes = {}
# Regression: collaboration trends over time
yearly_collab_trends = temp_df.groupby('funding_year').agg({
'num_partner_agencies': 'mean',
'ghg_efficiency': 'mean',
'pct_dac_funding': 'mean'
}).reset_index()
if len(yearly_collab_trends) > 3: # Only run regression if enough years
X = sm.add_constant(yearly_collab_trends['funding_year'])
# Partners over time regression
y_partners = yearly_collab_trends['num_partner_agencies']
model_partners = sm.OLS(y_partners, X).fit()
# GHG efficiency over time regression
y_ghg = yearly_collab_trends['ghg_efficiency']
model_ghg = sm.OLS(y_ghg, X).fit()
# DAC benefit over time regression
y_dac = yearly_collab_trends['pct_dac_funding']
model_dac = sm.OLS(y_dac, X).fit()
time_regressions = {
'partners_over_time': {
'coefficient': model_partners.params['funding_year'],
'p_value': model_partners.pvalues['funding_year'],
'r_squared': model_partners.rsquared
},
'ghg_efficiency_over_time': {
'coefficient': model_ghg.params['funding_year'],
'p_value': model_ghg.pvalues['funding_year'],
'r_squared': model_ghg.rsquared
},
'dac_benefit_over_time': {
'coefficient': model_dac.params['funding_year'],
'p_value': model_dac.pvalues['funding_year'],
'r_squared': model_dac.rsquared
}
}
else:
time_regressions = {}
# Prepare results
results = {
'yearly_trends': yearly_trends,
'year_over_year_changes': yoy_changes.to_dict('records'),
'breakpoint_analysis': breakpoint_analysis,
'breakpoint_pct_changes': pct_changes,
'time_regressions': time_regressions
}
logger.info("Completed temporal trends analysis")
return results
except Exception as e:
logger.error(f"Error analyzing temporal trends: {e}", exc_info=True)
raise AnalysisError(f"Failed to analyze temporal trends: {str(e)}")
# Comprehensive analysis methods that integrate multiple analyses
def get_comprehensive_analysis(self) -> Dict[str, Dict]:
"""
Get comprehensive analysis results addressing all research questions.
Returns:
Dictionary containing all analysis results
"""
try:
# Run all analyses
collab_structure = self.analyze_collaboration_structure()
regional_patterns = self.analyze_regional_patterns()
temporal_trends = self.analyze_temporal_trends()
# Integrate the results
comprehensive_results = {
'collaboration_structure': collab_structure,
'regional_patterns': regional_patterns,
'temporal_trends': temporal_trends,
'summary_findings': self._generate_summary_findings(
collab_structure, regional_patterns, temporal_trends
)
}
return comprehensive_results
except Exception as e:
logger.error(f"Error getting comprehensive analysis: {e}", exc_info=True)
raise AnalysisError("Failed to get comprehensive analysis")
def _generate_summary_findings(self,
collab_structure: Dict[str, Any],
regional_patterns: Dict[str, Any],
temporal_trends: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate key summary findings from all analyses.
Args:
collab_structure: Collaboration structure analysis results
regional_patterns: Regional patterns analysis results
temporal_trends: Temporal trends analysis results
Returns:
Dictionary containing key integrated findings
"""
try:
# Extract key metrics from collaboration structure analysis
collab_corr_ghg = collab_structure['correlations'].get('num_partner_agencies', {}).get('ghg_efficiency', 0)
collab_corr_dac = collab_structure['correlations'].get('num_partner_agencies', {}).get('pct_dac_funding', 0)
# Extract key metrics from regional patterns
multi_region_data = regional_patterns['multi_vs_single_region']['multi_region']
single_region_data = regional_patterns['multi_vs_single_region']['single_region']
# Extract key metrics from temporal trends
if temporal_trends.get('breakpoint_pct_changes'):
breakpoint_changes = temporal_trends['breakpoint_pct_changes']
post_2020_funding_change = breakpoint_changes.get('avg_project_funding_change', 0)
post_2020_partners_change = breakpoint_changes.get('avg_partners_change', 0)
post_2020_dac_change = breakpoint_changes.get('dac_benefit_change', 0)
else:
post_2020_funding_change = 0
post_2020_partners_change = 0
post_2020_dac_change = 0
# Compile key findings
key_findings = {
'collaboration_impact': {
'correlation_partners_ghg_efficiency': collab_corr_ghg,
'correlation_partners_dac_benefit': collab_corr_dac,
'ghg_efficiency_by_collab_size': {
'single_agency': collab_structure['collaboration_size_summary'][0]['ghg_efficiency']
if len(collab_structure['collaboration_size_summary']) > 0 else None,
'large_collab': collab_structure['collaboration_size_summary'][-1]['ghg_efficiency']
if len(collab_structure['collaboration_size_summary']) > 0 else None
}
},
'multi_vs_single_region': {
'multi_region_partners': multi_region_data['avg_partners'],
'single_region_partners': single_region_data['avg_partners'],
'partner_ratio': multi_region_data['avg_partners'] / single_region_data['avg_partners']
if single_region_data['avg_partners'] > 0 else float('inf'),
'multi_region_dac_benefit': multi_region_data['avg_dac_benefit'],
'single_region_dac_benefit': single_region_data['avg_dac_benefit'],
'dac_benefit_ratio': multi_region_data['avg_dac_benefit'] / single_region_data['avg_dac_benefit']
if single_region_data['avg_dac_benefit'] > 0 else float('inf')
},
'temporal_shifts': {
'post_2020_funding_change_pct': post_2020_funding_change,
'post_2020_partners_change_pct': post_2020_partners_change,
'post_2020_dac_benefit_change_pct': post_2020_dac_change
}
}
# Get most efficient region
if 'regional_summary' in regional_patterns and len(regional_patterns['regional_summary']) > 0:
regional_ghg_df = pd.DataFrame(regional_patterns['regional_summary'])
if 'ghg_efficiency' in regional_ghg_df.columns and not regional_ghg_df['ghg_efficiency'].isna().all():
most_efficient_region = regional_ghg_df.loc[regional_ghg_df['ghg_efficiency'].idxmin()]
key_findings['regional_efficiency'] = {
'most_efficient_region': most_efficient_region['region'],
'efficiency_value': most_efficient_region['ghg_efficiency'],
'avg_partners': most_efficient_region['num_partner_agencies']
}
return key_findings
except Exception as e:
logger.error(f"Error generating summary findings: {e}", exc_info=True)
return {'error': str(e)}
# Export and visualization methods
def export_analysis(self, filename: str) -> None:
"""
Export comprehensive analysis results to a JSON file.
Args:
filename: Path to output file
Raises:
AnalysisError: If export fails
"""
try:
analysis = self.get_comprehensive_analysis()
# Ensure directory exists
output_path = Path(filename)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Convert numpy types to Python types for JSON serialization
def json_serialize(obj):
if isinstance(obj, (np.integer, np.int64)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64)):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif pd.isna(obj):
return None
else:
return obj
# Export with proper encoding and formatting
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(analysis, f, indent=2, default=json_serialize)
logger.info(f"Analysis exported to {filename}")
except Exception as e:
logger.error(f"Error exporting analysis: {e}", exc_info=True)
raise AnalysisError(f"Failed to export analysis to {filename}")
def print_summary_findings(self) -> None:
"""Print a formatted summary of key findings."""
try:
# Get the comprehensive analysis
analysis = self.get_comprehensive_analysis()
# Extract key findings
findings = analysis['summary_findings']
# Print summary header
print("\n" + "="*80)
print(" CALIFORNIA CLIMATE INVESTMENTS (CCI) PROGRAM ANALYSIS SUMMARY ")
print("="*80 + "\n")
# Print collaboration impact findings
print("COLLABORATION STRUCTURE FINDINGS:")
collab_impact = findings.get('collaboration_impact', {})
print(f"- Correlation between number of partners and GHG efficiency: {collab_impact.get('correlation_partners_ghg_efficiency', 'N/A'):.3f}")
print(f"- Correlation between number of partners and DAC benefits: {collab_impact.get('correlation_partners_dac_benefit', 'N/A'):.3f}")
ghg_by_collab = collab_impact.get('ghg_efficiency_by_collab_size', {})
single_agency_ghg = ghg_by_collab.get('single_agency', 'N/A')
large_collab_ghg = ghg_by_collab.get('large_collab', 'N/A')
if single_agency_ghg != 'N/A' and large_collab_ghg != 'N/A':
print(f"- GHG Efficiency in single-agency projects: ${single_agency_ghg:.2f} per ton of GHG reduced")
print(f"- GHG Efficiency in large collaborations (6+ partners): ${large_collab_ghg:.2f} per ton of GHG reduced")
print(f"- Efficiency ratio (large collab / single agency): {large_collab_ghg/single_agency_ghg:.2f}x")
# Print regional comparison findings
print("\nMULTI-REGION VS SINGLE-REGION FINDINGS:")
region_comp = findings.get('multi_vs_single_region', {})
print(f"- Multi-region projects average {region_comp.get('multi_region_partners', 'N/A'):.1f} partners vs {region_comp.get('single_region_partners', 'N/A'):.1f} partners in single-region projects")
print(f"- Partner ratio (multi/single): {region_comp.get('partner_ratio', 'N/A'):.1f}x")
print(f"- DAC benefit in multi-region projects: {region_comp.get('multi_region_dac_benefit', 'N/A'):.1f}%")
print(f"- DAC benefit in single-region projects: {region_comp.get('single_region_dac_benefit', 'N/A'):.1f}%")
print(f"- DAC benefit ratio (multi/single): {region_comp.get('dac_benefit_ratio', 'N/A'):.1f}x")
# Print regional efficiency findings
if 'regional_efficiency' in findings:
reg_eff = findings['regional_efficiency']
print(f"\nMOST EFFICIENT REGION: {reg_eff.get('most_efficient_region', 'N/A')}")
print(f"- GHG Efficiency: ${reg_eff.get('efficiency_value', 'N/A'):.3f} per ton of GHG reduced")
print(f"- Average partners: {reg_eff.get('avg_partners', 'N/A'):.1f}")
# Print temporal trend findings
print("\nTEMPORAL TRENDS (PRE vs POST 2020):")
temp_shifts = findings.get('temporal_shifts', {})
print(f"- Project funding size change: {temp_shifts.get('post_2020_funding_change_pct', 'N/A'):.1f}%")
print(f"- Number of partners change: {temp_shifts.get('post_2020_partners_change_pct', 'N/A'):.1f}%")
print(f"- DAC benefit change: {temp_shifts.get('post_2020_dac_benefit_change_pct', 'N/A'):.1f}%")
print("\n" + "="*80)
except Exception as e:
logger.error(f"Error printing summary findings: {e}", exc_info=True)
print(f"Error generating summary: {str(e)}")
def plot_collaboration_outcomes(self, output_file: Optional[str] = None) -> None:
"""
Create visualizations of collaboration outcomes.
Args:
output_file: Path to save visualization (optional)
"""
try:
if 'collaboration_outcomes' not in self.data:
raise AnalysisError("Collaboration outcomes dataset not available")
df = self.data['collaboration_outcomes']
# Set up the figure
plt.figure(figsize=(15, 10))
# Plot 1: Collaboration Size vs. GHG Efficiency
plt.subplot(2, 2, 1)
plt.scatter(df['num_partner_agencies'], df['ghg_efficiency'], alpha=0.6)
plt.xlabel('Number of Partner Agencies')
plt.ylabel('GHG Efficiency ($ per ton of GHG reduced)')
plt.title('Collaboration Size vs. GHG Efficiency')
# Add trend line
if len(df) > 1:
z = np.polyfit(df['num_partner_agencies'], df['ghg_efficiency'], 1)
p = np.poly1d(z)
plt.plot(df['num_partner_agencies'], p(df['num_partner_agencies']), "r--", alpha=0.8)
# Plot 2: Collaboration Size vs. DAC Benefit
plt.subplot(2, 2, 2)
plt.scatter(df['num_partner_agencies'], df['pct_dac_funding'], alpha=0.6)
plt.xlabel('Number of Partner Agencies')
plt.ylabel('DAC Benefit Percentage')
plt.title('Collaboration Size vs. DAC Benefit')
# Add trend line
if len(df) > 1:
z = np.polyfit(df['num_partner_agencies'], df['pct_dac_funding'], 1)
p = np.poly1d(z)
plt.plot(df['num_partner_agencies'], p(df['num_partner_agencies']), "r--", alpha=0.8)
# Plot 3: Collaboration Size Categories - GHG Efficiency
plt.subplot(2, 2, 3)
category_means = df.groupby('collab_size_category')['ghg_efficiency'].mean().reindex([
'Single agency', 'Small collab (2-3)', 'Medium collab (4-5)', 'Large collab (6+)'
])
category_means.plot(kind='bar')
plt.xlabel('Collaboration Size Category')
plt.ylabel('Average GHG Efficiency')
plt.title('GHG Efficiency by Collaboration Size')
plt.xticks(rotation=45)
# Plot 4: Collaboration Size Categories - DAC Benefit
plt.subplot(2, 2, 4)
category_means = df.groupby('collab_size_category')['pct_dac_funding'].mean().reindex([
'Single agency', 'Small collab (2-3)', 'Medium collab (4-5)', 'Large collab (6+)'
])
category_means.plot(kind='bar')
plt.xlabel('Collaboration Size Category')
plt.ylabel('Average DAC Benefit Percentage')
plt.title('DAC Benefit by Collaboration Size')
plt.xticks(rotation=45)
plt.tight_layout()
# Save or display
if output_file:
plt.savefig(output_file, dpi=300, bbox_inches='tight')
logger.info(f"Collaboration outcomes visualization saved to {output_file}")
else:
plt.show()
plt.close()
except Exception as e:
logger.error(f"Error plotting collaboration outcomes: {e}", exc_info=True)
raise AnalysisError(f"Failed to create collaboration outcomes visualization: {str(e)}")
def plot_temporal_trends(self, output_file: Optional[str] = None) -> None:
"""
Create visualizations of temporal trends.
Args:
output_file: Path to save visualization (optional)
"""
try:
if 'yearly_summary' not in self.data:
raise AnalysisError("Yearly summary dataset not available")
df = self.data['yearly_summary']
# Set up the figure
plt.figure(figsize=(15, 15))
# Plot 1: Funding Over Time
plt.subplot(3, 2, 1)
plt.bar(df['funding_year'], df['total_ggrf_funding'] / 1_000_000)
plt.xlabel('Year')
plt.ylabel('Total Funding ($ Millions)')
plt.title('CCI Funding by Year')
# Add vertical line at breakpoint year
plt.axvline(x=self.config.temporal_breakpoint, color='r', linestyle='--', alpha=0.7)
# Plot 2: Number of Partners Over Time
plt.subplot(3, 2, 2)
plt.plot(df['funding_year'], df['avg_partners'], marker='o')
plt.xlabel('Year')
plt.ylabel('Average Number of Partners')
plt.title('Agency Collaboration Over Time')
# Add vertical line at breakpoint year
plt.axvline(x=self.config.temporal_breakpoint, color='r', linestyle='--', alpha=0.7)
# Plot 3: GHG Reductions Over Time
plt.subplot(3, 2, 3)
plt.bar(df['funding_year'], df['total_project_ghg_reductions'] / 1_000)
plt.xlabel('Year')
plt.ylabel('Total GHG Reductions (Thousands of Tons)')
plt.title('GHG Reductions by Year')
# Add vertical line at breakpoint year
plt.axvline(x=self.config.temporal_breakpoint, color='r', linestyle='--', alpha=0.7)
# Plot 4: DAC Benefit Over Time
plt.subplot(3, 2, 4)
plt.plot(df['funding_year'], df['pct_dac_funding'], marker='o')
plt.xlabel('Year')
plt.ylabel('DAC Benefit Percentage')
plt.title('DAC Benefits by Year')
# Add vertical line at breakpoint year
plt.axvline(x=self.config.temporal_breakpoint, color='r', linestyle='--', alpha=0.7)
# Plot 5: GHG Efficiency Over Time
plt.subplot(3, 2, 5)
plt.plot(df['funding_year'], df['ghg_efficiency'], marker='o')
plt.xlabel('Year')
plt.ylabel('GHG Efficiency ($ per ton reduced)')
plt.title('GHG Efficiency Over Time')
# Add vertical line at breakpoint year
plt.axvline(x=self.config.temporal_breakpoint, color='r', linestyle='--', alpha=0.7)
# Plot 6: Pre/Post 2020 Comparison
plt.subplot(3, 2, 6)
if 'breakpoint_summary' in self.data:
bp_df = self.data['breakpoint_summary']
pre = bp_df[bp_df['post_breakpoint'] == 0]
post = bp_df[bp_df['post_breakpoint'] == 1]
if not pre.empty and not post.empty:
metrics = ['num_partner_agencies_mean', 'ghg_efficiency_mean', 'dac_benefit_percentage_mean']
labels = ['Avg Partners', 'GHG Efficiency', 'DAC Benefit %']
x = np.arange(len(metrics))
width = 0.35
# Normalize values for comparison
pre_values = [pre[m].iloc[0] for m in metrics]
post_values = [post[m].iloc[0] for m in metrics]
# Calculate percentage change for display
pct_changes = [(post_values[i] / pre_values[i] - 1) * 100 if pre_values[i] != 0 else 0
for i in range(len(metrics))]
plt.bar(x - width/2, pre_values, width, label='Pre-2020')
plt.bar(x + width/2, post_values, width, label='Post-2020')
plt.xlabel('Metric')
plt.ylabel('Value')
plt.title('Pre vs Post 2020 Comparison')
plt.xticks(x, labels)
plt.legend()
# Add percentage change labels
for i, pct in enumerate(pct_changes):
plt.annotate(f"{pct:.1f}%",
xy=(i, max(pre_values[i], post_values[i])),
ha='center', va='bottom')
plt.tight_layout()
# Save or display
if output_file:
plt.savefig(output_file, dpi=300, bbox_inches='tight')
logger.info(f"Temporal trends visualization saved to {output_file}")
else:
plt.show()
plt.close()
except Exception as e:
logger.error(f"Error plotting temporal trends: {e}", exc_info=True)
raise AnalysisError(f"Failed to create temporal trends visualization: {str(e)}")
def plot_regional_analysis(self, output_file: Optional[str] = None) -> None:
"""
Create visualizations of regional collaboration and outcomes.
Args:
output_file: Path to save visualization (optional)
"""
try:
if 'regional_summary' not in self.data:
raise AnalysisError("Regional summary dataset not available")
df = self.data['regional_summary']
# Set up the figure
plt.figure(figsize=(15, 15))
# Plot 1: Funding by Region
plt.subplot(3, 2, 1)
funding_by_region = df.sort_values('total_ggrf_funding', ascending=False)
plt.bar(funding_by_region['region'], funding_by_region['total_ggrf_funding'] / 1_000_000)
plt.xlabel('Region')
plt.ylabel('Total Funding ($ Millions)')
plt.title('CCI Funding by Region')
plt.xticks(rotation=45, ha='right')
# Plot 2: Average Partners by Region
plt.subplot(3, 2, 2)
partners_by_region = df.sort_values('avg_partners', ascending=False)
plt.bar(partners_by_region['region'], partners_by_region['avg_partners'])
plt.xlabel('Region')
plt.ylabel('Average Number of Partners')
plt.title('Agency Collaboration by Region')
plt.xticks(rotation=45, ha='right')
# Plot 3: GHG Efficiency by Region
plt.subplot(3, 2, 3)
ghg_eff_by_region = df.sort_values('ghg_efficiency') # Lower is better for efficiency
plt.bar(ghg_eff_by_region['region'], ghg_eff_by_region['ghg_efficiency'])
plt.xlabel('Region')
plt.ylabel('GHG Efficiency ($ per ton reduced)')
plt.title('GHG Efficiency by Region')
plt.xticks(rotation=45, ha='right')
# Plot 4: DAC Benefit by Region
plt.subplot(3, 2, 4)
dac_by_region = df.sort_values('dac_benefit_percentage', ascending=False)
plt.bar(dac_by_region['region'], dac_by_region['dac_benefit_percentage'])
plt.xlabel('Region')
plt.ylabel('DAC Benefit Percentage')
plt.title('DAC Benefits by Region')
plt.xticks(rotation=45, ha='right')
# Plot 5: Partners vs. GHG Efficiency Scatter
plt.subplot(3, 2, 5)
plt.scatter(df['avg_partners'], df['ghg_efficiency'])
# Add region labels to points
for i, row in df.iterrows():
plt.annotate(row['region'], (row['avg_partners'], row['ghg_efficiency']),
fontsize=8, ha='right')
plt.xlabel('Average Number of Partners')
plt.ylabel('GHG Efficiency ($ per ton reduced)')
plt.title('Partners vs. GHG Efficiency by Region')
# Plot 6: Partners vs. DAC Benefit Scatter
plt.subplot(3, 2, 6)
plt.scatter(df['avg_partners'], df['dac_benefit_percentage'])
# Add region labels to points
for i, row in df.iterrows():
plt.annotate(row['region'], (row['avg_partners'], row['dac_benefit_percentage']),
fontsize=8, ha='right')
plt.xlabel('Average Number of Partners')
plt.ylabel('DAC Benefit Percentage')
plt.title('Partners vs. DAC Benefit by Region')
plt.tight_layout()
# Save or display
if output_file:
plt.savefig(output_file, dpi=300, bbox_inches='tight')
logger.info(f"Regional analysis visualization saved to {output_file}")
else:
plt.show()
plt.close()
except Exception as e:
logger.error(f"Error plotting regional analysis: {e}", exc_info=True)
raise AnalysisError(f"Failed to create regional analysis visualization: {str(e)}")
def generate_findings_for_paper(self) -> str:
"""
Generate key findings formatted for inclusion in a research paper.
Returns:
String containing formatted findings
"""
try:
# Get the comprehensive analysis
analysis = self.get_comprehensive_analysis()
# Extract key findings
findings = analysis['summary_findings']
# Format the findings as text
text = """## Key Findings from California Climate Investments (CCI) Analysis
### Collaboration Structure and Scale Findings
Our analysis of the structure and scale of inter-agency collaboration in the CCI program reveals several key patterns:
"""
# Add collaboration impact findings
collab_impact = findings.get('collaboration_impact', {})
corr_ghg = collab_impact.get('correlation_partners_ghg_efficiency', 0)
corr_dac = collab_impact.get('correlation_partners_dac_benefit', 0)
text += f"* The correlation between number of partners and GHG efficiency is {corr_ghg:.3f}, "
if corr_ghg > 0.1:
text += "suggesting larger collaborations may be less efficient in reducing GHG emissions per dollar spent.\n"
elif corr_ghg < -0.1:
text += "suggesting larger collaborations may be more efficient in reducing GHG emissions per dollar spent.\n"
else:
text += "indicating minimal relationship between collaboration size and GHG reduction efficiency.\n"
text += f"* The correlation between number of partners and DAC benefits is {corr_dac:.3f}, "
if corr_dac > 0.1:
text += "suggesting larger collaborations may be more effective at delivering benefits to disadvantaged communities.\n"
elif corr_dac < -0.1:
text += "suggesting smaller collaborations may be more effective at delivering benefits to disadvantaged communities.\n"
else:
text += "indicating minimal relationship between collaboration size and benefits to disadvantaged communities.\n"
# Add regional comparison findings
text += "\n### Regional Patterns\n\n"
region_comp = findings.get('multi_vs_single_region', {})
multi_partners = region_comp.get('multi_region_partners', 0)
single_partners = region_comp.get('single_region_partners', 0)
partner_ratio = region_comp.get('partner_ratio', 0)
multi_dac = region_comp.get('multi_region_dac_benefit', 0)
single_dac = region_comp.get('single_region_dac_benefit', 0)
dac_ratio = region_comp.get('dac_benefit_ratio', 0)
text += f"* Multi-regional projects involve significantly more collaborating agencies ({multi_partners:.1f} partners on average) compared to single-region projects ({single_partners:.1f} partners), a ratio of {partner_ratio:.1f}x.\n"
text += f"* Multi-regional projects demonstrate {multi_dac:.1f}% DAC benefit compared to {single_dac:.1f}% for single-region projects, a ratio of {dac_ratio:.1f}x. "
if dac_ratio > 1.2:
text += "This suggests that cross-regional collaborations may be more effective at addressing equity concerns.\n"
elif dac_ratio < 0.8:
text += "This suggests that more localized efforts may be more effective at addressing equity concerns.\n"
else:
text += "This indicates similar equity outcomes regardless of regional scope.\n"
# Add regional efficiency findings
if 'regional_efficiency' in findings:
reg_eff = findings['regional_efficiency']
most_eff_region = reg_eff.get('most_efficient_region', '')
eff_value = reg_eff.get('efficiency_value', 0)
avg_partners = reg_eff.get('avg_partners', 0)
text += f"* The {most_eff_region} region achieved the highest GHG efficiency (${eff_value:.3f} per ton of GHG reduced) despite having fewer collaborating partners ({avg_partners:.1f} on average), suggesting that smaller, more focused collaborations may achieve better climate outcomes in some contexts.\n"
# Add temporal trend findings
text += "\n### Temporal Trends (Pre vs Post 2020)\n\n"
temp_shifts = findings.get('temporal_shifts', {})
funding_change = temp_shifts.get('post_2020_funding_change_pct', 0)
partners_change = temp_shifts.get('post_2020_partners_change_pct', 0)
dac_change = temp_shifts.get('post_2020_dac_benefit_change_pct', 0)
text += f"* Post-2020 projects show a {funding_change:.1f}% increase in average project funding size and a {partners_change:.1f}% increase in the number of participating partners.\n"
text += f"* Despite increased resources and collaboration, DAC benefits changed by {dac_change:.1f}% in the post-2020 period. "
if dac_change < -5:
text += "This substantial decline in equity outcomes despite increased collaboration suggests a potential shift in program priorities or implementation challenges.\n"
elif dac_change > 5:
text += "This substantial improvement in equity outcomes alongside increased collaboration suggests successful program maturation and better integration of equity goals.\n"
else:
text += "This relatively stable outcome suggests consistent attention to equity goals despite changing program structure.\n"
# Add conclusion
text += "\n### Summary\n\n"
text += "Our analysis reveals complex tradeoffs between collaboration structure, GHG efficiency, and equity outcomes in California's Climate Investments program. While multi-agency partnerships can enhance program reach and equity outcomes, they may face efficiency tradeoffs that must be carefully managed. The significant temporal shifts observed post-2020 highlight the dynamic nature of collaborative climate governance and the challenges of balancing multiple policy objectives.\n"
return text
except Exception as e:
logger.error(f"Error generating findings for paper: {e}", exc_info=True)
raise AnalysisError(f"Failed to generate findings for paper: {str(e)}")
def run_hierarchical_regression(self) -> Dict[str, Any]:
"""
Run hierarchical regression analysis as described in research methods.
Returns:
Dictionary containing hierarchical regression results
"""
try:
if 'cci_projects' not in self.data:
raise AnalysisError("CCI project data not available")
df = self.data['cci_projects']
# 1. Base model: Control for project size, agency type, and regional fixed effects
model1_ghg = smf.ols('ghg_efficiency ~ total_ggrf_funding + agency_name + region', data=df).fit()
model1_dac = smf.ols('dac_benefit_percentage ~ total_ggrf_funding + agency_name + region', data=df).fit()
# 2. Add partnership variables
model2_ghg = smf.ols('ghg_efficiency ~ total_ggrf_funding + agency_name + region + num_partner_agencies', data=df).fit()
model2_dac = smf.ols('dac_benefit_percentage ~ total_ggrf_funding + agency_name + region + num_partner_agencies', data=df).fit()
# 3. Add interaction between partnerships and regional characteristics
model3_formula_ghg = 'ghg_efficiency ~ total_ggrf_funding + agency_name + region + num_partner_agencies + num_partner_agencies:region'
model3_formula_dac = 'dac_benefit_percentage ~ total_ggrf_funding + agency_name + region + num_partner_agencies + num_partner_agencies:region'
# Try to fit the interaction models, but handle potential convergence issues
try:
model3_ghg = smf.ols(model3_formula_ghg, data=df).fit()
except Exception as e:
logger.warning(f"Could not fit interaction model for GHG efficiency: {e}")
model3_ghg = None
try:
model3_dac = smf.ols(model3_formula_dac, data=df).fit()
except Exception as e:
logger.warning(f"Could not fit interaction model for DAC benefits: {e}")
model3_dac = None
# Prepare results
results = {
'ghg_efficiency': {
'base_model': {
'r_squared': model1_ghg.rsquared,
'adj_r_squared': model1_ghg.rsquared_adj,
'aic': model1_ghg.aic,
'significant_predictors': [var for var, pval in model1_ghg.pvalues.items() if pval < 0.05]
},
'partnership_model': {
'r_squared': model2_ghg.rsquared,
'adj_r_squared': model2_ghg.rsquared_adj,
'aic': model2_ghg.aic,
'significant_predictors': [var for var, pval in model2_ghg.pvalues.items() if pval < 0.05],
'partnership_coefficient': model2_ghg.params.get('num_partner_agencies', None),
'partnership_p_value': model2_ghg.pvalues.get('num_partner_agencies', None)
}
},
'dac_benefit': {
'base_model': {
'r_squared': model1_dac.rsquared,
'adj_r_squared': model1_dac.rsquared_adj,
'aic': model1_dac.aic,
'significant_predictors': [var for var, pval in model1_dac.pvalues.items() if pval < 0.05]
},
'partnership_model': {
'r_squared': model2_dac.rsquared,
'adj_r_squared': model2_dac.rsquared_adj,
'aic': model2_dac.aic,
'significant_predictors': [var for var, pval in model2_dac.pvalues.items() if pval < 0.05],
'partnership_coefficient': model2_dac.params.get('num_partner_agencies', None),
'partnership_p_value': model2_dac.pvalues.get('num_partner_agencies', None)
}
}
}
# Add interaction model results if available
if model3_ghg is not None:
results['ghg_efficiency']['interaction_model'] = {
'r_squared': model3_ghg.rsquared,
'adj_r_squared': model3_ghg.rsquared_adj,
'aic': model3_ghg.aic,
'significant_predictors': [var for var, pval in model3_ghg.pvalues.items() if pval < 0.05],
'significant_interactions': [var for var, pval in model3_ghg.pvalues.items()
if pval < 0.05 and ':' in var]
}
if model3_dac is not None:
results['dac_benefit']['interaction_model'] = {
'r_squared': model3_dac.rsquared,
'adj_r_squared': model3_dac.rsquared_adj,
'aic': model3_dac.aic,
'significant_predictors': [var for var, pval in model3_dac.pvalues.items() if pval < 0.05],
'significant_interactions': [var for var, pval in model3_dac.pvalues.items()
if pval < 0.05 and ':' in var]
}
# Model comparison: R-squared improvement
results['ghg_efficiency']['r_squared_improvement'] = model2_ghg.rsquared - model1_ghg.rsquared
results['dac_benefit']['r_squared_improvement'] = model2_dac.rsquared - model1_dac.rsquared
logger.info("Completed hierarchical regression analysis")
return results
except Exception as e:
logger.error(f"Error running hierarchical regression: {e}", exc_info=True)
raise AnalysisError(f"Failed to run hierarchical regression: {str(e)}")
def prepare_propensity_score_matching(self) -> Dict[str, Any]:
"""
Perform propensity score matching to compare high vs. low collaboration projects.
Returns:
Dictionary containing propensity score matching results
"""
try:
if 'cci_projects' not in self.data:
raise AnalysisError("CCI project data not available")
df = self.data['cci_projects']
# Define high collaboration intensity (over five partners)
df['high_collab'] = (df['num_partner_agencies'] > 5).astype(int)
# Variables to control for in matching
control_vars = ['total_ggrf_funding', 'region', 'agency_name']
# Prepare data for matching
model_data = pd.get_dummies(df[control_vars + ['high_collab']], columns=['region', 'agency_name'], drop_first=True)
# Fit propensity score model
ps_model = sm.Logit(model_data['high_collab'],
sm.add_constant(model_data.drop('high_collab', axis=1))).fit(disp=0)
# Calculate propensity scores
df['propensity_score'] = ps_model.predict()
# Create matched groups
high_collab = df[df['high_collab'] == 1]
low_collab = df[df['high_collab'] == 0]
# Simple nearest neighbor matching
matched_indices = []
for idx, high_row in high_collab.iterrows():
ps = high_row['propensity_score']
# Find nearest neighbor in propensity score
nearest_idx = abs(low_collab['propensity_score'] - ps).idxmin()
matched_indices.append((idx, nearest_idx))
# Create matched dataset
matched_high = df.loc[[i[0] for i in matched_indices]]
matched_low = df.loc[[i[1] for i in matched_indices]]
# Compare outcomes between matched groups
high_ghg_eff = matched_high['ghg_efficiency'].mean()
low_ghg_eff = matched_low['ghg_efficiency'].mean()
high_dac = matched_high['dac_benefit_percentage'].mean()
low_dac = matched_low['dac_benefit_percentage'].mean()
# T-tests on matched groups
ghg_ttest = stats.ttest_ind(
matched_high['ghg_efficiency'].dropna(),
matched_low['ghg_efficiency'].dropna(),
equal_var=False
)
dac_ttest = stats.ttest_ind(
matched_high['dac_benefit_percentage'].dropna(),
matched_low['dac_benefit_percentage'].dropna(),
equal_var=False
)
# Results
results = {
'matching_summary': {
'num_high_collab_projects': len(high_collab),
'num_low_collab_projects': len(low_collab),
'num_matched_pairs': len(matched_indices)
},
'outcome_comparison': {
'ghg_efficiency': {
'high_collab_mean': high_ghg_eff,
'low_collab_mean': low_ghg_eff,
'difference': high_ghg_eff - low_ghg_eff,
'pct_difference': 100 * (high_ghg_eff - low_ghg_eff) / low_ghg_eff if low_ghg_eff != 0 else 0,
't_stat': ghg_ttest.statistic,
'p_value': ghg_ttest.pvalue
},
'dac_benefit': {
'high_collab_mean': high_dac,
'low_collab_mean': low_dac,
'difference': high_dac - low_dac,
'pct_difference': 100 * (high_dac - low_dac) / low_dac if low_dac != 0 else 0,
't_stat': dac_ttest.statistic,
'p_value': dac_ttest.pvalue
}
}
}
logger.info(f"Completed propensity score matching with {len(matched_indices)} matched pairs")
return results
except Exception as e:
logger.error(f"Error in propensity score matching: {e}", exc_info=True)
raise AnalysisError(f"Failed to perform propensity score matching: {str(e)}")
if __name__ == "__main__":
# Example usage
try:
# Initialize the analyzer
analyzer = CCIAnalyzer(
data_path="data/cci_programs_data_reduced.csv",
output_path="./output",
start_year=2015,
end_year=2024,
temporal_breakpoint=2020
)
# Run analysis and print summary
analyzer.print_summary_findings()
# Generate visualizations
analyzer.plot_collaboration_outcomes("./output/collaboration_outcomes.png")
analyzer.plot_temporal_trends("./output/temporal_trends.png")
analyzer.plot_regional_analysis("./output/regional_analysis.png")
# Export full analysis results
analyzer.export_analysis("./output/cci_analysis_results.json")
# Generate findings for paper
findings_text = analyzer.generate_findings_for_paper()
with open("./output/paper_findings.md", "w") as f:
f.write(findings_text)
print("Analysis complete. Results saved to ./output directory.")
except Exception as e:
print(f"Analysis failed: {e}")