import pandas as pd import numpy as np from pathlib import Path import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("cci_data_prep") def clean_and_prepare_cci_data(input_path, output_path=None): """ Clean and prepare the CCI data for analysis, fixing specific issues identified. Parameters: input_path (str): Path to the original CCI data file output_path (str, optional): Path to save the cleaned data Returns: pd.DataFrame: The cleaned and prepared data """ logger.info(f"Loading data from {input_path}") # Try different encodings if needed try: df = pd.read_csv(input_path) except UnicodeDecodeError: logger.info("Trying different encoding (latin-1)") df = pd.read_csv(input_path, encoding='latin-1') logger.info(f"Successfully loaded {len(df)} rows with {len(df.columns)} columns") # 1. Fix column names - standardize to lowercase with underscores df.columns = [col.strip().lower().replace(' ', '_') for col in df.columns] # 2. Identify and mark EV vouchers/rebates logger.info("Identifying EV vouchers and rebates") # Check if required columns exist required_cols = ['agency_name', 'program_name'] if not all(col in df.columns for col in required_cols): missing = [col for col in required_cols if col not in df.columns] logger.error(f"Missing required columns: {missing}") return df # Identify CARB's Low Carbon Transportation projects carb_mask = df['agency_name'].str.contains('Air Resources Board', case=False, na=False) lct_mask = df['program_name'].str.contains('Low Carbon Transportation', case=False, na=False) # Create CARB indicator df['is_carb'] = carb_mask # 3. Mark EV projects using multiple methods # Start with subprogram if available ev_mask = pd.Series(False, index=df.index) if 'sub_program_name' in df.columns: ev_indicators = ['Clean Cars 4 All', 'CVRP', 'Clean Vehicle', 'EV', 'Electric Vehicle', 'Hybrid', 'Rebate', 'Voucher', 'ZEV', 'Zero Emission'] ev_subprogram_mask = df['sub_program_name'].str.contains('|'.join(ev_indicators), case=False, na=False) ev_mask = ev_mask | (carb_mask & lct_mask & ev_subprogram_mask) # Check project count column if 'number_of_rebates_issued' in df.columns: rebate_mask = df['number_of_rebates_issued'] > 0 ev_mask = ev_mask | (carb_mask & rebate_mask) # Check for small funding amounts typical of vouchers funding_col = None for col in df.columns: if 'total_program' in col.lower() and 'funding' in col.lower(): funding_col = col break if funding_col: # Identify potential vouchers by small funding amount (for individual vouchers) small_funding_mask = (df[funding_col] > 0) & (df[funding_col] < 10000) & carb_mask & lct_mask ev_mask = ev_mask | small_funding_mask # Mark EV vouchers df['is_ev_voucher'] = ev_mask count_ev = ev_mask.sum() logger.info(f"Identified {count_ev} EV vouchers/rebates") # 4. Create funding year if needed if 'funding_year' not in df.columns and 'fiscal_year_funding_project' in df.columns: # Extract year from fiscal year format (e.g., "2019-20" -> 2019) try: df['funding_year'] = df['fiscal_year_funding_project'].str.extract(r'(\d{4})').astype('Int64') logger.info("Created funding_year column from fiscal year data") except Exception as e: logger.error(f"Error creating funding_year: {e}") # 5. Calculate GHG efficiency if funding_col: ghg_col = None for col in df.columns: if 'total_project' in col.lower() and 'ghg' in col.lower(): ghg_col = col break if ghg_col: df['ghg_efficiency'] = np.where( df[ghg_col] > 0, df[funding_col] / df[ghg_col], np.nan ) logger.info("Calculated GHG efficiency ($ per ton CO2e)") # 6. Calculate DAC benefit percentage dac_funding_col = None for col in df.columns: if 'funding_benefiting' in col.lower() and 'disadvantaged' in col.lower(): dac_funding_col = col break if dac_funding_col and funding_col: df['dac_benefit_percentage'] = np.where( df[funding_col] > 0, 100 * df[dac_funding_col] / df[funding_col], 0 ) logger.info("Calculated DAC benefit percentage") # 7. Identify multi-agency programs logger.info("Identifying multi-agency programs") program_agencies = df.groupby('program_name')['agency_name'].nunique() df['num_agencies_in_program'] = df['program_name'].map(program_agencies) df['is_multi_agency'] = df['num_agencies_in_program'] > 1 multi_agency_count = (df['is_multi_agency'] == True).sum() logger.info(f"Found {multi_agency_count} projects in multi-agency programs") # 8. Identify regional scope if 'county' in df.columns: logger.info("Determining regional scope of projects") program_counties = df.groupby('program_name')['county'].nunique() df['num_counties'] = df['program_name'].map(program_counties) # Define region categories df['regional_scope'] = pd.cut( df['num_counties'], bins=[0, 1, 3, 10, np.inf], labels=['Single County', 'Limited Regional', 'Regional', 'Multi-Regional'] ) # 9. Assign California region based on county if 'county' in df.columns: logger.info("Assigning California regions") # Define California regions ca_regions = { 'Bay Area': ['Alameda', 'Contra Costa', 'Marin', 'Napa', 'San Francisco', 'San Mateo', 'Santa Clara', 'Solano', 'Sonoma'], 'Sacramento Region': ['El Dorado', 'Placer', 'Sacramento', 'Sutter', 'Yolo', 'Yuba'], 'San Joaquin Valley': ['Fresno', 'Kern', 'Kings', 'Madera', 'Merced', 'San Joaquin', 'Stanislaus', 'Tulare'], 'Southern California': ['Imperial', 'Los Angeles', 'Orange', 'Riverside', 'San Bernardino', 'San Diego', 'Ventura'], 'Central Coast': ['Monterey', 'San Benito', 'San Luis Obispo', 'Santa Barbara', 'Santa Cruz'], 'Northern California': ['Butte', 'Colusa', 'Del Norte', 'Glenn', 'Humboldt', 'Lake', 'Lassen', 'Mendocino', 'Modoc', 'Nevada', 'Plumas', 'Shasta', 'Sierra', 'Siskiyou', 'Tehama', 'Trinity'], 'Sierra Nevada': ['Alpine', 'Amador', 'Calaveras', 'Inyo', 'Mariposa', 'Mono', 'Tuolumne'] } # Create mapping dictionary county_to_region = {} for region, counties in ca_regions.items(): for county in counties: county_to_region[county] = region # Apply mapping df['ca_region'] = df['county'].map(county_to_region) # For projects with multiple counties, determine if they are multi-region multi_county_programs = program_counties[program_counties > 1].index # For multi-county programs, check if they span multiple regions for program in multi_county_programs: program_df = df[df['program_name'] == program] unique_regions = program_df['ca_region'].nunique() if unique_regions > 1: df.loc[df['program_name'] == program, 'ca_region'] = 'Multi-Region' # 10. Create temporal period indicator (pre/post 2020) if 'funding_year' in df.columns: logger.info("Creating temporal period indicator (pre/post 2020)") df['period'] = df['funding_year'].apply(lambda x: 'Post-2020' if x >= 2020 else 'Pre-2020') # 11. Handle outliers in GHG efficiency and DAC benefit if 'ghg_efficiency' in df.columns: # Cap extreme values at 95th percentile upper_limit = df['ghg_efficiency'].quantile(0.95) df['ghg_efficiency_capped'] = df['ghg_efficiency'].clip(upper=upper_limit) # Log transform for analysis df['ghg_efficiency_log'] = np.log1p(df['ghg_efficiency_capped']) logger.info(f"Handled outliers in GHG efficiency (capped at ${upper_limit:.2f} per ton)") if 'dac_benefit_percentage' in df.columns: # Handle values > 100% df['dac_benefit_percentage'] = df['dac_benefit_percentage'].clip(upper=100) logger.info("Capped DAC benefit percentage at 100%") # Save cleaned data if output path provided if output_path: output_file = Path(output_path) logger.info(f"Saving cleaned data to {output_file}") df.to_csv(output_file, index=False) return df def json_serializable(obj): """Convert NumPy types to Python standard types for JSON serialization.""" if isinstance(obj, (np.integer, np.int64)): return int(obj) elif isinstance(obj, (np.floating, np.float64)): return float(obj) elif isinstance(obj, (np.ndarray,)): return obj.tolist() else: return obj def generate_data_summary(df, output_path=None): """ Generate a summary of the cleaned CCI data. Parameters: df (pd.DataFrame): The cleaned CCI data output_path (str, optional): Path to save the summary Returns: dict: Summary statistics """ summary = {} # 1. Basic dataset stats summary['total_projects'] = len(df) summary['total_agencies'] = df['agency_name'].nunique() summary['total_programs'] = df['program_name'].nunique() if 'sub_program_name' in df.columns: summary['total_subprograms'] = df['sub_program_name'].nunique() # 2. CARB vs Non-CARB breakdown if 'is_carb' in df.columns: carb_df = df[df['is_carb']] non_carb_df = df[~df['is_carb']] summary['carb_projects'] = len(carb_df) summary['non_carb_projects'] = len(non_carb_df) summary['carb_percentage'] = len(carb_df) / len(df) * 100 # 3. EV vouchers breakdown if 'is_ev_voucher' in df.columns: ev_df = df[df['is_ev_voucher']] summary['ev_vouchers'] = len(ev_df) summary['ev_percentage'] = len(ev_df) / len(df) * 100 if 'is_carb' in df.columns: summary['ev_percentage_of_carb'] = len(ev_df) / len(carb_df) * 100 if len(carb_df) > 0 else 0 # 4. Funding statistics funding_col = None for col in df.columns: if 'total_program' in col.lower() and 'funding' in col.lower(): funding_col = col break if funding_col: summary['total_funding'] = df[funding_col].sum() summary['avg_funding_per_project'] = df[funding_col].mean() if 'is_carb' in df.columns: summary['carb_funding'] = carb_df[funding_col].sum() summary['non_carb_funding'] = non_carb_df[funding_col].sum() summary['carb_funding_percentage'] = carb_df[funding_col].sum() / df[funding_col].sum() * 100 summary['avg_carb_funding'] = carb_df[funding_col].mean() summary['avg_non_carb_funding'] = non_carb_df[funding_col].mean() if 'is_ev_voucher' in df.columns: summary['ev_funding'] = ev_df[funding_col].sum() summary['ev_funding_percentage'] = ev_df[funding_col].sum() / df[funding_col].sum() * 100 summary['avg_ev_funding'] = ev_df[funding_col].mean() # 5. GHG reduction statistics ghg_col = None for col in df.columns: if 'total_project' in col.lower() and 'ghg' in col.lower(): ghg_col = col break if ghg_col: summary['total_ghg_reduction'] = df[ghg_col].sum() summary['avg_ghg_reduction_per_project'] = df[ghg_col].mean() if 'is_carb' in df.columns: summary['carb_ghg_reduction'] = carb_df[ghg_col].sum() summary['non_carb_ghg_reduction'] = non_carb_df[ghg_col].sum() summary['carb_ghg_percentage'] = carb_df[ghg_col].sum() / df[ghg_col].sum() * 100 if 'is_ev_voucher' in df.columns: summary['ev_ghg_reduction'] = ev_df[ghg_col].sum() summary['ev_ghg_percentage'] = ev_df[ghg_col].sum() / df[ghg_col].sum() * 100 # 6. Efficiency statistics if 'ghg_efficiency' in df.columns: # Use median for efficiency due to skewness valid_efficiency = df[df['ghg_efficiency'].notna() & (df['ghg_efficiency'] > 0)] if len(valid_efficiency) > 0: summary['median_ghg_efficiency'] = valid_efficiency['ghg_efficiency'].median() if 'is_carb' in df.columns: valid_carb = carb_df[carb_df['ghg_efficiency'].notna() & (carb_df['ghg_efficiency'] > 0)] valid_non_carb = non_carb_df[non_carb_df['ghg_efficiency'].notna() & (non_carb_df['ghg_efficiency'] > 0)] if len(valid_carb) > 0: summary['median_carb_efficiency'] = valid_carb['ghg_efficiency'].median() if len(valid_non_carb) > 0: summary['median_non_carb_efficiency'] = valid_non_carb['ghg_efficiency'].median() if 'is_ev_voucher' in df.columns: valid_ev = ev_df[ev_df['ghg_efficiency'].notna() & (ev_df['ghg_efficiency'] > 0)] if len(valid_ev) > 0: summary['median_ev_efficiency'] = valid_ev['ghg_efficiency'].median() # 7. DAC benefit statistics if 'dac_benefit_percentage' in df.columns: summary['avg_dac_benefit'] = df['dac_benefit_percentage'].mean() if 'is_carb' in df.columns: summary['avg_carb_dac_benefit'] = carb_df['dac_benefit_percentage'].mean() summary['avg_non_carb_dac_benefit'] = non_carb_df['dac_benefit_percentage'].mean() if 'is_ev_voucher' in df.columns: summary['avg_ev_dac_benefit'] = ev_df['dac_benefit_percentage'].mean() # 8. Multi-agency statistics if 'is_multi_agency' in df.columns: multi_df = df[df['is_multi_agency']] single_df = df[~df['is_multi_agency']] summary['multi_agency_projects'] = len(multi_df) summary['multi_agency_percentage'] = len(multi_df) / len(df) * 100 if 'num_agencies_in_program' in df.columns: summary['avg_agencies_per_program'] = df['num_agencies_in_program'].mean() if 'ghg_efficiency' in df.columns: valid_multi = multi_df[multi_df['ghg_efficiency'].notna() & (multi_df['ghg_efficiency'] > 0)] valid_single = single_df[single_df['ghg_efficiency'].notna() & (single_df['ghg_efficiency'] > 0)] if len(valid_multi) > 0: summary['median_multi_agency_efficiency'] = valid_multi['ghg_efficiency'].median() if len(valid_single) > 0: summary['median_single_agency_efficiency'] = valid_single['ghg_efficiency'].median() if 'dac_benefit_percentage' in df.columns: summary['avg_multi_agency_dac_benefit'] = multi_df['dac_benefit_percentage'].mean() summary['avg_single_agency_dac_benefit'] = single_df['dac_benefit_percentage'].mean() # 9. Temporal statistics if 'period' in df.columns: pre_df = df[df['period'] == 'Pre-2020'] post_df = df[df['period'] == 'Post-2020'] summary['pre_2020_projects'] = len(pre_df) summary['post_2020_projects'] = len(post_df) if 'num_agencies_in_program' in df.columns: summary['pre_2020_avg_agencies'] = pre_df['num_agencies_in_program'].mean() summary['post_2020_avg_agencies'] = post_df['num_agencies_in_program'].mean() summary['agency_change_percentage'] = ((post_df['num_agencies_in_program'].mean() - pre_df['num_agencies_in_program'].mean()) / pre_df['num_agencies_in_program'].mean() * 100) if pre_df['num_agencies_in_program'].mean() > 0 else 0 if funding_col: summary['pre_2020_avg_funding'] = pre_df[funding_col].mean() summary['post_2020_avg_funding'] = post_df[funding_col].mean() summary['funding_change_percentage'] = ((post_df[funding_col].mean() - pre_df[funding_col].mean()) / pre_df[funding_col].mean() * 100) if pre_df[funding_col].mean() > 0 else 0 if 'dac_benefit_percentage' in df.columns: summary['pre_2020_avg_dac_benefit'] = pre_df['dac_benefit_percentage'].mean() summary['post_2020_avg_dac_benefit'] = post_df['dac_benefit_percentage'].mean() summary['dac_change_percentage'] = ((post_df['dac_benefit_percentage'].mean() - pre_df['dac_benefit_percentage'].mean()) / pre_df['dac_benefit_percentage'].mean() * 100) if pre_df['dac_benefit_percentage'].mean() > 0 else 0 # 10. Regional statistics if 'ca_region' in df.columns: region_counts = df['ca_region'].value_counts() region_percentages = df['ca_region'].value_counts(normalize=True) * 100 summary['region_counts'] = region_counts.to_dict() summary['region_percentages'] = region_percentages.to_dict() # Get efficiency and DAC benefit by region if 'ghg_efficiency' in df.columns: region_efficiency = df.groupby('ca_region')['ghg_efficiency'].median() summary['region_efficiency'] = region_efficiency.to_dict() if 'dac_benefit_percentage' in df.columns: region_dac = df.groupby('ca_region')['dac_benefit_percentage'].mean() summary['region_dac_benefit'] = region_dac.to_dict() # Save summary if output path provided if output_path: import json output_file = Path(output_path) output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w') as f: json.dump(summary, f, indent=2, default=json_serializable) logger.info(f"Saved data summary to {output_file}") # Also create a readable text version text_file = output_file.with_suffix('.txt') with open(text_file, 'w') as f: f.write("CALIFORNIA CLIMATE INVESTMENTS (CCI) DATA SUMMARY\n") f.write("================================================\n\n") f.write("DATASET OVERVIEW\n") f.write(f"Total Projects: {summary['total_projects']:,}\n") f.write(f"Total Agencies: {summary['total_agencies']}\n") f.write(f"Total Programs: {summary['total_programs']}\n") if 'total_subprograms' in summary: f.write(f"Total Subprograms: {summary['total_subprograms']}\n") f.write("\nCARB VS NON-CARB BREAKDOWN\n") if 'carb_projects' in summary: f.write(f"CARB Projects: {summary['carb_projects']:,} ({summary['carb_percentage']:.1f}%)\n") f.write(f"Non-CARB Projects: {summary['non_carb_projects']:,} ({100-summary['carb_percentage']:.1f}%)\n") if 'carb_funding' in summary: f.write(f"CARB Funding: ${summary['carb_funding']:,.2f} ({summary['carb_funding_percentage']:.1f}%)\n") f.write(f"Non-CARB Funding: ${summary['non_carb_funding']:,.2f} ({100-summary['carb_funding_percentage']:.1f}%)\n") f.write(f"Average CARB Project: ${summary['avg_carb_funding']:,.2f}\n") f.write(f"Average Non-CARB Project: ${summary['avg_non_carb_funding']:,.2f}\n") if 'carb_ghg_reduction' in summary: f.write(f"CARB GHG Reductions: {summary['carb_ghg_reduction']:,.2f} tons ({summary['carb_ghg_percentage']:.1f}%)\n") f.write(f"Non-CARB GHG Reductions: {summary['non_carb_ghg_reduction']:,.2f} tons ({100-summary['carb_ghg_percentage']:.1f}%)\n") if 'median_carb_efficiency' in summary and 'median_non_carb_efficiency' in summary: f.write(f"CARB Efficiency: ${summary['median_carb_efficiency']:,.2f} per ton CO2e\n") f.write(f"Non-CARB Efficiency: ${summary['median_non_carb_efficiency']:,.2f} per ton CO2e\n") f.write("\nEV VOUCHERS BREAKDOWN\n") if 'ev_vouchers' in summary: f.write(f"EV Vouchers: {summary['ev_vouchers']:,} ({summary['ev_percentage']:.1f}% of total)\n") if 'ev_percentage_of_carb' in summary: f.write(f"Percentage of CARB Projects: {summary['ev_percentage_of_carb']:.1f}%\n") if 'ev_funding' in summary: f.write(f"EV Funding: ${summary['ev_funding']:,.2f} ({summary['ev_funding_percentage']:.1f}% of total)\n") f.write(f"Average Voucher Amount: ${summary['avg_ev_funding']:,.2f}\n") if 'ev_ghg_reduction' in summary: f.write(f"EV GHG Reductions: {summary['ev_ghg_reduction']:,.2f} tons ({summary['ev_ghg_percentage']:.1f}% of total)\n") if 'median_ev_efficiency' in summary: f.write(f"EV Efficiency: ${summary['median_ev_efficiency']:,.2f} per ton CO2e\n") f.write("\nMULTI-AGENCY COLLABORATION\n") if 'multi_agency_projects' in summary: f.write(f"Multi-Agency Projects: {summary['multi_agency_projects']:,} ({summary['multi_agency_percentage']:.1f}%)\n") if 'avg_agencies_per_program' in summary: f.write(f"Average Agencies per Program: {summary['avg_agencies_per_program']:.2f}\n") if 'median_multi_agency_efficiency' in summary and 'median_single_agency_efficiency' in summary: f.write(f"Multi-Agency Efficiency: ${summary['median_multi_agency_efficiency']:,.2f} per ton CO2e\n") f.write(f"Single-Agency Efficiency: ${summary['median_single_agency_efficiency']:,.2f} per ton CO2e\n") if 'avg_multi_agency_dac_benefit' in summary and 'avg_single_agency_dac_benefit' in summary: f.write(f"Multi-Agency DAC Benefit: {summary['avg_multi_agency_dac_benefit']:.2f}%\n") f.write(f"Single-Agency DAC Benefit: {summary['avg_single_agency_dac_benefit']:.2f}%\n") f.write("\nTEMPORAL TRENDS (PRE/POST 2020)\n") if 'pre_2020_projects' in summary and 'post_2020_projects' in summary: f.write(f"Pre-2020 Projects: {summary['pre_2020_projects']:,}\n") f.write(f"Post-2020 Projects: {summary['post_2020_projects']:,}\n") if 'agency_change_percentage' in summary: f.write(f"Change in Average Agencies: {summary['agency_change_percentage']:+.1f}%\n") if 'funding_change_percentage' in summary: f.write(f"Change in Average Funding: {summary['funding_change_percentage']:+.1f}%\n") if 'dac_change_percentage' in summary: f.write(f"Change in DAC Benefit: {summary['dac_change_percentage']:+.1f}%\n") f.write("\nREGIONAL ANALYSIS\n") if 'region_counts' in summary: for region, count in sorted(summary['region_counts'].items(), key=lambda x: x[1], reverse=True): f.write(f"{region}: {count:,} projects ({summary['region_percentages'][region]:.1f}%)\n") f.write("\nEfficiency by Region ($ per ton CO2e):\n") if 'region_efficiency' in summary: for region, efficiency in sorted(summary['region_efficiency'].items(), key=lambda x: x[1]): f.write(f"{region}: ${efficiency:,.2f}\n") f.write("\nDAC Benefit by Region:\n") if 'region_dac_benefit' in summary: for region, dac in sorted(summary['region_dac_benefit'].items(), key=lambda x: x[1], reverse=True): f.write(f"{region}: {dac:.2f}%\n") logger.info(f"Saved readable summary to {text_file}") return summary if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='Clean and prepare CCI data for analysis') parser.add_argument('--input_path', type=str, required=True, help='Path to the input CCI data file') parser.add_argument('--output_path', type=str, help='Path to save the cleaned data') parser.add_argument('--summary_path', type=str, help='Path to save the data summary') args = parser.parse_args() # Clean and prepare the data cleaned_df = clean_and_prepare_cci_data(args.input_path, args.output_path) # Generate summary if args.summary_path: generate_data_summary(cleaned_df, args.summary_path)