j40-cejst-2/data/data-pipeline/data_pipeline/etl/sources/census_decennial/etl.py
2024-12-04 21:29:13 -05:00

332 lines
13 KiB
Python

import os
import numpy as np
import pandas as pd
import geopandas as gpd
import json
from typing import List
from pathlib import Path
from data_pipeline.etl.sources.census_decennial.constants import (
DEC_TERRITORY_PARAMS,
DEC_FIELD_NAMES,
OUTPUT_RACE_FIELDS,
)
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.datasource import DataSource
from data_pipeline.etl.datasource import FileDataSource
from data_pipeline.score import field_names
from data_pipeline.utils import get_module_logger
from data_pipeline.etl.sources.census_acs.etl import CensusACSETL
from data_pipeline.etl.sources.census_acs.etl_imputations import (
calculate_income_measures,
)
pd.options.mode.chained_assignment = "raise"
logger = get_module_logger(__name__)
class CensusDecennialETL(ExtractTransformLoad):
DECENNIAL_YEAR = 2020
OUTPUT_PATH = (
ExtractTransformLoad.DATA_PATH
/ "dataset"
/ f"census_decennial_{DECENNIAL_YEAR}"
)
CENSUS_GEOJSON_PATH = (
ExtractTransformLoad.DATA_PATH / "census" / "geojson" / "us.json"
)
def __get_api_url(
self,
state_abbreviation: str,
name_list: List[str],
fips: str,
county: str,
) -> str:
url = (
f"https://api.census.gov/data/{self.DECENNIAL_YEAR}/dec/dhc{state_abbreviation}?get=NAME,{name_list}"
+ f"&for=tract:*&in=state:{fips}%20county:{county}"
)
census_api_key = os.environ.get("CENSUS_API_KEY")
if census_api_key:
url += f"&key={census_api_key}"
return url
def __get_destination_path(
self,
state_abbreviation: str,
fips: str,
county: str,
test_path: Path = None,
) -> str:
root_path = test_path or self.get_sources_path()
return (
root_path
/ str(self.DECENNIAL_YEAR)
/ state_abbreviation
/ fips
/ county
/ "census.json"
)
def __init__(self):
self.df_all = pd.DataFrame()
self.final_race_fields = []
def get_data_sources(self) -> List[DataSource]:
sources = []
for island in DEC_TERRITORY_PARAMS:
for county in island["county_fips"]:
api_url = self.__get_api_url(
island["state_abbreviation"],
",".join(island["xwalk"].keys()),
island["fips"],
county,
)
sources.append(
FileDataSource(
api_url,
self.__get_destination_path(
island["state_abbreviation"], island["fips"], county
),
)
)
return sources
def extract(
self,
use_cached_data_sources: bool = False,
test_territory_params=None,
test_path: Path = None,
) -> None:
super().extract(use_cached_data_sources)
for territory in test_territory_params or DEC_TERRITORY_PARAMS:
for county in territory["county_fips"]:
abbr = territory["state_abbreviation"]
file_path = self.__get_destination_path(
abbr, territory["fips"], county, test_path=test_path
)
try:
json_data = json.load(file_path.open())
except (FileNotFoundError, ValueError) as e:
logger.error(
f"Could not load content in census decennial ETL because {e}."
)
raise
df = pd.DataFrame(json_data[1:], columns=json_data[0])
# Rename the columns to their common names
df.rename(columns=territory["xwalk"], inplace=True)
# Convert columns to numeric where applicable
for column in df.columns:
if column not in ["state", "county", "NAME", "tract"]:
df[column] = pd.to_numeric(df[column], errors="ignore")
# Add the territory median income
df.loc[
df[field_names.CENSUS_DECENNIAL_TOTAL_POPULATION_FIELD_2019]
> 0,
DEC_FIELD_NAMES.TERRITORY_MEDIAN_INCOME,
] = territory["median_income"]
self.df_all = pd.concat([self.df_all, df])
def _merge_tracts_2010_compatibility(self):
"""Merges tract 69120950200 to match 2010 tracts"""
# MP 69/120 69120950200 = 69120950201, 69120950202
# Tract has been split, but 69120950202 has no data, so we just make 69120950200 = 69120950201
self.df_all = self.df_all.drop(
self.df_all[
self.df_all[field_names.GEOID_TRACT_FIELD] == "69120950202"
].index
)
self.df_all.loc[
self.df_all[field_names.GEOID_TRACT_FIELD] == "69120950201",
field_names.GEOID_TRACT_FIELD,
] = "69120950200"
def _impute_income(self, geojson_path: Path):
"""Impute income for both income measures."""
# Merges Census geojson to imput values from.
logger.debug(f"Reading GeoJSON from {geojson_path}")
geo_df = gpd.read_file(geojson_path)
self.df_all = CensusACSETL.merge_geojson(
df=self.df_all,
usa_geo_df=geo_df,
)
logger.debug("Imputing income information")
impute_var_named_tup_list = [
CensusACSETL.ImputeVariables(
raw_field_name=field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_200_FPL_FIELD_2019,
imputed_field_name=DEC_FIELD_NAMES.IMPUTED_PERCENTAGE_HOUSEHOLDS_BELOW_200_PERC_POVERTY_LEVEL,
),
]
self.df_all = calculate_income_measures(
impute_var_named_tup_list=impute_var_named_tup_list,
geo_df=self.df_all,
geoid_field=self.GEOID_TRACT_FIELD_NAME,
population_field=field_names.CENSUS_DECENNIAL_TOTAL_POPULATION_FIELD_2019,
)
logger.debug("Calculating with imputed values")
self.df_all[
field_names.CENSUS_DECENNIAL_ADJUSTED_POVERTY_LESS_THAN_200_FPL_FIELD_2019
] = (
self.df_all[
field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_200_FPL_FIELD_2019
].fillna(
self.df_all[
DEC_FIELD_NAMES.IMPUTED_PERCENTAGE_HOUSEHOLDS_BELOW_200_PERC_POVERTY_LEVEL
]
)
# Use clip to ensure that the values are not negative
).clip(
lower=0
)
# All values should have a value at this point for tracts with >0 population
assert (
self.df_all[
self.df_all[
field_names.CENSUS_DECENNIAL_TOTAL_POPULATION_FIELD_2019
]
>= 1
][
field_names.CENSUS_DECENNIAL_ADJUSTED_POVERTY_LESS_THAN_200_FPL_FIELD_2019
]
.isna()
.sum()
== 0
), "Error: not all values were filled with imputations..."
# We generate a boolean that is TRUE when there is an imputed income but not a baseline income, and FALSE otherwise.
# This allows us to see which tracts have an imputed income.
self.df_all[field_names.ISLAND_AREAS_IMPUTED_INCOME_FLAG_FIELD] = (
self.df_all[
field_names.CENSUS_DECENNIAL_ADJUSTED_POVERTY_LESS_THAN_200_FPL_FIELD_2019
].notna()
& self.df_all[
field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_200_FPL_FIELD_2019
].isna()
)
def transform(self, geojson_path: Path = CENSUS_GEOJSON_PATH) -> None:
# Creating Geo ID (Census Block Group) Field Name
self.df_all[field_names.GEOID_TRACT_FIELD] = (
self.df_all["state"] + self.df_all["county"] + self.df_all["tract"]
)
# Combine the two MP 2020 tracts that were split from one 2010 tract
self._merge_tracts_2010_compatibility()
# Replace invalid numeric values with NaN
numeric_columns = self.df_all.select_dtypes(include="number").columns
for num_column in numeric_columns:
self.df_all.loc[self.df_all[num_column] < -999, num_column] = np.nan
# Percentage of households below 100% FPL
self.df_all[
field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_100_FPL_FIELD_2019
] = (
self.df_all[DEC_FIELD_NAMES.HOUSEHOLD_POVERTY_LEVEL_UNDER_0_5]
+ self.df_all[DEC_FIELD_NAMES.HOUSEHOLD_POVERTY_LEVEL_UNDER_0_74]
+ self.df_all[DEC_FIELD_NAMES.HOUSEHOLD_POVERTY_LEVEL_UNDER_0_99]
) / self.df_all[
DEC_FIELD_NAMES.TOTAL_HOUSEHOLD_POVERTY_LEVEL
]
# Percentage of households below 200% which is
self.df_all[
field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_200_FPL_FIELD_2019
] = (
self.df_all[DEC_FIELD_NAMES.TOTAL_HOUSEHOLD_POVERTY_LEVEL]
- self.df_all[DEC_FIELD_NAMES.HOUSEHOLD_POVERTY_LEVEL_OVER_2_0]
) / self.df_all[
DEC_FIELD_NAMES.TOTAL_HOUSEHOLD_POVERTY_LEVEL
]
# Percentage High School Achievement is
# Percentage = (Male + Female) / (Total)
self.df_all[field_names.CENSUS_DECENNIAL_HIGH_SCHOOL_ED_FIELD_2019] = (
self.df_all[DEC_FIELD_NAMES.MALE_HIGH_SCHOOL_ED]
+ self.df_all[DEC_FIELD_NAMES.FEMALE_HIGH_SCHOOL_ED]
) / self.df_all[
field_names.CENSUS_DECENNIAL_TOTAL_POPULATION_FIELD_2019
]
# Calculate employment.
self.df_all[field_names.CENSUS_DECENNIAL_UNEMPLOYMENT_FIELD_2019] = (
self.df_all[DEC_FIELD_NAMES.EMPLOYMENT_MALE_UNEMPLOYED]
+ self.df_all[DEC_FIELD_NAMES.EMPLOYMENT_FEMALE_UNEMPLOYED]
) / (
self.df_all[DEC_FIELD_NAMES.EMPLOYMENT_MALE_IN_LABOR_FORCE]
+ self.df_all[DEC_FIELD_NAMES.EMPLOYMENT_FEMALE_IN_LABOR_FORCE]
)
# Calculate area median income
self.df_all[
field_names.CENSUS_DECENNIAL_AREA_MEDIAN_INCOME_PERCENT_FIELD_2019
] = (
self.df_all[field_names.CENSUS_DECENNIAL_MEDIAN_INCOME_2019]
/ self.df_all[DEC_FIELD_NAMES.TERRITORY_MEDIAN_INCOME]
)
# Calculate college attendance
self.df_all[DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_POPULATION] = (
self.df_all[DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_MALE_ENROLLED]
+ self.df_all[DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_FEMALE_ENROLLED]
)
self.df_all[DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_PERCENT] = (
self.df_all[DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_MALE_ENROLLED]
+ self.df_all[DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_FEMALE_ENROLLED]
) / self.df_all[DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_TOTAL_ENROLLED]
self.df_all[DEC_FIELD_NAMES.COLLEGE_NON_ATTENDANCE_PERCENT] = (
1 - self.df_all[DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_PERCENT]
)
# Calculate stats by race
for race_field_name in OUTPUT_RACE_FIELDS:
output_field_name = (
field_names.PERCENT_PREFIX
+ race_field_name
# 2010 vs 2020 WARNING
# We must keep the old 2009 date to make it compatible with all the other 2010 data
+ f" in {field_names.DEC_DATA_YEAR}"
)
self.df_all[output_field_name] = (
self.df_all[race_field_name]
/ self.df_all[DEC_FIELD_NAMES.TOTAL_RACE_POPULATION]
)
self.final_race_fields.append(output_field_name)
# Reporting Missing Values
for col in self.df_all.columns:
missing_value_count = self.df_all[col].isnull().sum()
logger.debug(
f"There are {missing_value_count} missing values in the field {col} out of a total of {self.df_all.shape[0]} rows"
)
self._impute_income(geojson_path)
def load(self) -> None:
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
columns_to_include = [
field_names.GEOID_TRACT_FIELD,
field_names.CENSUS_DECENNIAL_TOTAL_POPULATION_FIELD_2019,
field_names.CENSUS_DECENNIAL_MEDIAN_INCOME_2019,
DEC_FIELD_NAMES.TERRITORY_MEDIAN_INCOME,
field_names.CENSUS_DECENNIAL_AREA_MEDIAN_INCOME_PERCENT_FIELD_2019,
field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_100_FPL_FIELD_2019,
field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_200_FPL_FIELD_2019,
DEC_FIELD_NAMES.IMPUTED_PERCENTAGE_HOUSEHOLDS_BELOW_200_PERC_POVERTY_LEVEL,
field_names.CENSUS_DECENNIAL_ADJUSTED_POVERTY_LESS_THAN_200_FPL_FIELD_2019,
field_names.CENSUS_DECENNIAL_UNEMPLOYMENT_FIELD_2019,
field_names.CENSUS_DECENNIAL_HIGH_SCHOOL_ED_FIELD_2019,
DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_PERCENT,
DEC_FIELD_NAMES.COLLEGE_NON_ATTENDANCE,
DEC_FIELD_NAMES.COLLEGE_ATTENDANCE_POPULATION,
field_names.ISLAND_AREAS_IMPUTED_INCOME_FLAG_FIELD,
] + self.final_race_fields
self.df_all[columns_to_include].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)