Files
california-equity-git/archive/analysis/database_setup.ipynb
2025-04-09 20:26:45 -07:00

262 lines
10 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Successfully connected to the database!\n",
"Dropping existing database objects...\n",
"Processing CalEnviroScreen data...\n",
"Loading CES data to database...\n",
"Processing CCI project data...\n",
"Loading CCI data to database...\n",
"Creating analysis views...\n",
"Data loading completed successfully!\n",
"\n",
"Record counts:\n",
"CES data: 8035 records\n",
"CCI projects: 120715 records\n"
]
}
],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"import os\n",
"import geopandas as gpd\n",
"from sqlalchemy import create_engine, text\n",
"from datetime import datetime\n",
"\n",
"# Database configuration\n",
"DB_USER = os.getenv('DB_USER', 'postgres')\n",
"DB_PASSWORD = os.getenv('DB_PASSWORD', 'MandyLinkToby3')\n",
"DB_HOST = os.getenv('DB_HOST', '192.168.0.74')\n",
"DB_PORT = os.getenv('DB_PORT', '5432')\n",
"DB_NAME = 'calif_equity'\n",
"\n",
"# Set working directory\n",
"os.chdir('/home/dadams/Repos/california_equity_git')\n",
"\n",
"# Create database connection\n",
"def create_db_engine():\n",
" connection_string = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'\n",
" return create_engine(connection_string)\n",
"\n",
"def drop_existing_objects(engine):\n",
" \"\"\"Drop existing database objects in the correct order\"\"\"\n",
" with engine.connect() as connection:\n",
" # Drop views first\n",
" connection.execute(text(\"DROP VIEW IF EXISTS project_efficiency CASCADE\"))\n",
" connection.execute(text(\"DROP VIEW IF EXISTS regional_collaboration CASCADE\"))\n",
" # Then drop tables\n",
" connection.execute(text(\"DROP TABLE IF EXISTS agency_partnerships CASCADE\"))\n",
" connection.execute(text(\"DROP TABLE IF EXISTS cci_projects CASCADE\"))\n",
" connection.execute(text(\"DROP TABLE IF EXISTS ces_data CASCADE\"))\n",
" connection.commit()\n",
"\n",
"def process_ces_data(filepath):\n",
" print(\"Processing CalEnviroScreen data...\")\n",
" gdf = gpd.read_file(filepath)\n",
" \n",
" # Clean and standardize column names\n",
" gdf.columns = [col.lower().replace(' ', '_') for col in gdf.columns]\n",
" \n",
" # Convert tract ID to string and ensure it's clean\n",
" gdf['tract'] = gdf['tract'].astype(str).str.strip()\n",
" \n",
" # Select and rename relevant columns\n",
" ces_data = gdf[['tract', 'zip', 'county', 'approxloc', 'totpop19',\n",
" 'ciscore', 'ciscorep', 'ozone', 'ozonep', 'pm2_5',\n",
" 'pm2_5_p', 'drinkwat', 'drinkwatp', 'poverty',\n",
" 'povertyp', 'unempl', 'unemplp', 'housburd',\n",
" 'housburdp', 'geometry']]\n",
" \n",
" # Rename columns to match database schema\n",
" column_map = {\n",
" 'tract': 'tract_id',\n",
" 'zip': 'zip_code',\n",
" 'approxloc': 'approx_loc',\n",
" 'totpop19': 'total_pop_19',\n",
" 'ciscore': 'ci_score',\n",
" 'ciscorep': 'ci_score_pctl',\n",
" 'pm2_5': 'pm25',\n",
" 'pm2_5_p': 'pm25_pctl',\n",
" 'drinkwat': 'drinking_water',\n",
" 'drinkwatp': 'drinking_water_pctl',\n",
" 'housburd': 'housing_burden',\n",
" 'housburdp': 'housing_burden_pctl',\n",
" 'geometry': 'geom'\n",
" }\n",
" ces_data = ces_data.rename(columns=column_map)\n",
" \n",
" # Set the geometry column explicitly\n",
" ces_data = ces_data.set_geometry('geom')\n",
" \n",
" return ces_data\n",
"\n",
"def process_cci_data(filepath):\n",
" print(\"Processing CCI project data...\")\n",
" df = pd.read_csv(filepath, low_memory=False)\n",
" \n",
" # Clean column names\n",
" df.columns = [col.lower().replace(' ', '_') for col in df.columns]\n",
" \n",
" # Convert date columns\n",
" df['date_operational'] = pd.to_datetime(df['date_operational'])\n",
" \n",
" # Filter date range\n",
" df = df[\n",
" (df['date_operational'] >= '2015-01-01') &\n",
" (df['date_operational'] <= '2024-12-31')\n",
" ]\n",
" \n",
" # Process project partners\n",
" df['project_partners'] = df['project_partners'].fillna('')\n",
" \n",
" # Select and prepare relevant columns\n",
" cci_data = df[[\n",
" 'project_idnumber', 'reporting_cycle_name', 'agency_name',\n",
" 'program_name', 'program_description', 'project_name',\n",
" 'project_type', 'project_description', 'date_operational',\n",
" 'census_tract', 'county', 'total_program_ggrffunding',\n",
" 'total_project_ghgreductions', 'is_benefit_disadvantaged_communities',\n",
" 'project_partners'\n",
" ]]\n",
" \n",
" # Rename columns to match schema\n",
" column_map = {\n",
" 'project_idnumber': 'project_id',\n",
" 'reporting_cycle_name': 'reporting_cycle',\n",
" 'total_program_ggrffunding': 'total_funding',\n",
" 'total_project_ghgreductions': 'ghg_reduction',\n",
" 'is_benefit_disadvantaged_communities': 'dac_benefit'\n",
" }\n",
" cci_data = cci_data.rename(columns=column_map)\n",
" \n",
" # Convert boolean columns\n",
" cci_data['dac_benefit'] = cci_data['dac_benefit'].astype(bool)\n",
" \n",
" return cci_data\n",
"\n",
"def create_views(engine):\n",
" with engine.connect() as connection:\n",
" # Project efficiency view\n",
" connection.execute(text(\"\"\"\n",
" CREATE VIEW project_efficiency AS\n",
" SELECT \n",
" p.project_id,\n",
" p.program_name,\n",
" p.agency_name,\n",
" p.total_funding,\n",
" p.ghg_reduction,\n",
" p.dac_benefit,\n",
" CASE \n",
" WHEN p.total_funding > 0 THEN p.ghg_reduction / p.total_funding \n",
" ELSE 0 \n",
" END as ghg_efficiency,\n",
" c.ci_score as ces_score,\n",
" CASE \n",
" WHEN p.project_partners = '' THEN 0\n",
" ELSE (length(p.project_partners) - length(replace(p.project_partners, ',', '')) + 1)\n",
" END as partner_count\n",
" FROM cci_projects p\n",
" LEFT JOIN ces_data c ON cast(p.census_tract as text) = cast(c.tract_id as text)\n",
" \"\"\"))\n",
" \n",
" # Regional collaboration view\n",
" connection.execute(text(\"\"\"\n",
" CREATE VIEW regional_collaboration AS\n",
" SELECT \n",
" county,\n",
" COUNT(DISTINCT project_id) as project_count,\n",
" AVG(CASE \n",
" WHEN project_partners = '' THEN 0\n",
" ELSE (length(project_partners) - length(replace(project_partners, ',', '')) + 1)\n",
" END) as avg_partners,\n",
" SUM(total_funding) as total_funding,\n",
" SUM(CASE WHEN dac_benefit THEN 1 ELSE 0 END)::FLOAT / COUNT(*) as dac_rate,\n",
" SUM(ghg_reduction) / NULLIF(SUM(total_funding), 0) as region_efficiency\n",
" FROM cci_projects\n",
" GROUP BY county\n",
" \"\"\"))\n",
" \n",
" connection.commit()\n",
"\n",
"def load_data_to_db():\n",
" try:\n",
" engine = create_db_engine()\n",
" \n",
" # Drop existing objects first\n",
" print(\"Dropping existing database objects...\")\n",
" drop_existing_objects(engine)\n",
" \n",
" # Load CES data\n",
" ces_data = process_ces_data('/home/dadams/Repos/california_equity_git/california_enviroscreen/calif_enviroscreen_shape/CES4_Final_Shapefile.shp')\n",
" print(\"Loading CES data to database...\")\n",
" ces_data.to_postgis('ces_data', engine, if_exists='replace', index=False)\n",
" \n",
" # Load CCI data\n",
" cci_data = process_cci_data('/home/dadams/Repos/california_equity_git/data_raw/cci_programs_data.csv')\n",
" print(\"Loading CCI data to database...\")\n",
" cci_data.to_sql('cci_projects', engine, if_exists='replace', index=False)\n",
" \n",
" print(\"Creating analysis views...\")\n",
" create_views(engine)\n",
" \n",
" print(\"Data loading completed successfully!\")\n",
" \n",
" return {\n",
" 'ces_records': len(ces_data),\n",
" 'cci_records': len(cci_data)\n",
" }\n",
" \n",
" except Exception as e:\n",
" print(f\"Error loading data: {e}\")\n",
" raise\n",
"\n",
"# Test database connection\n",
"try:\n",
" engine = create_db_engine()\n",
" with engine.connect() as conn:\n",
" print(\"Successfully connected to the database!\")\n",
"except Exception as e:\n",
" print(f\"Error connecting to the database: {str(e)}\")\n",
"\n",
"# Execute loading\n",
"record_counts = load_data_to_db()\n",
"print(\"\\nRecord counts:\")\n",
"print(f\"CES data: {record_counts['ces_records']} records\")\n",
"print(f\"CCI projects: {record_counts['cci_records']} records\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}