mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-07-28 14:31:16 -07:00
Imputing income using geographic neighbors (#1559)
Imputes income field with a light refactor. Needs more refactor and more tests (I spotchecked). Next ticket will check and address but a lot of "narwhal" architecture is here.
This commit is contained in:
parent
485a9a8316
commit
f047ca9d83
16 changed files with 1245 additions and 81 deletions
|
@ -1,14 +1,26 @@
|
|||
from collections import namedtuple
|
||||
import os
|
||||
import pandas as pd
|
||||
import geopandas as gpd
|
||||
|
||||
from data_pipeline.config import settings
|
||||
from data_pipeline.etl.base import ExtractTransformLoad
|
||||
from data_pipeline.etl.sources.census_acs.etl_utils import (
|
||||
retrieve_census_acs_data,
|
||||
)
|
||||
from data_pipeline.utils import get_module_logger
|
||||
from data_pipeline.etl.sources.census_acs.etl_imputations import (
|
||||
calculate_income_measures,
|
||||
)
|
||||
|
||||
from data_pipeline.utils import get_module_logger, unzip_file_from_url
|
||||
from data_pipeline.score import field_names
|
||||
|
||||
logger = get_module_logger(__name__)
|
||||
|
||||
# because now there is a requirement for the us.json, this will port from
|
||||
# AWS when a local copy does not exist.
|
||||
CENSUS_DATA_S3_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/census.zip"
|
||||
|
||||
|
||||
class CensusACSETL(ExtractTransformLoad):
|
||||
def __init__(self):
|
||||
|
@ -59,6 +71,23 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME = (
|
||||
"Percent of individuals < 200% Federal Poverty Line"
|
||||
)
|
||||
self.IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME = (
|
||||
"Percent of individuals < 200% Federal Poverty Line, imputed"
|
||||
)
|
||||
|
||||
self.ADJUSTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME = (
|
||||
"Adjusted percent of individuals < 200% Federal Poverty Line"
|
||||
)
|
||||
|
||||
self.ADJUSTED_AND_IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME_PRELIMINARY = (
|
||||
"Preliminary adjusted percent of individuals < 200% Federal Poverty Line,"
|
||||
+ " imputed"
|
||||
)
|
||||
|
||||
self.ADJUSTED_AND_IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME = (
|
||||
"Adjusted percent of individuals < 200% Federal Poverty Line,"
|
||||
+ " imputed"
|
||||
)
|
||||
|
||||
self.MEDIAN_HOUSE_VALUE_FIELD = "B25077_001E"
|
||||
self.MEDIAN_HOUSE_VALUE_FIELD_NAME = (
|
||||
|
@ -136,6 +165,10 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
"Percent enrollment in college or graduate school"
|
||||
)
|
||||
|
||||
self.IMPUTED_COLLEGE_ATTENDANCE_FIELD = (
|
||||
"Percent enrollment in college or graduate school, imputed"
|
||||
)
|
||||
|
||||
self.COLLEGE_NON_ATTENDANCE_FIELD = "Percent of population not currently enrolled in college or graduate school"
|
||||
|
||||
self.RE_FIELDS = [
|
||||
|
@ -188,18 +221,50 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
self.MEDIAN_INCOME_FIELD_NAME,
|
||||
self.POVERTY_LESS_THAN_100_PERCENT_FPL_FIELD_NAME,
|
||||
self.POVERTY_LESS_THAN_150_PERCENT_FPL_FIELD_NAME,
|
||||
self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME,
|
||||
self.IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME,
|
||||
self.MEDIAN_HOUSE_VALUE_FIELD_NAME,
|
||||
self.HIGH_SCHOOL_ED_FIELD,
|
||||
self.COLLEGE_ATTENDANCE_FIELD,
|
||||
self.COLLEGE_NON_ATTENDANCE_FIELD,
|
||||
self.IMPUTED_COLLEGE_ATTENDANCE_FIELD,
|
||||
]
|
||||
+ self.RE_OUTPUT_FIELDS
|
||||
+ [self.PERCENT_PREFIX + field for field in self.RE_OUTPUT_FIELDS]
|
||||
+ [
|
||||
field_names.POVERTY_LESS_THAN_200_FPL_FIELD,
|
||||
field_names.POVERTY_LESS_THAN_200_FPL_IMPUTED_FIELD,
|
||||
]
|
||||
)
|
||||
|
||||
self.df: pd.DataFrame
|
||||
|
||||
def _merge_geojson(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
usa_geo_df: gpd.GeoDataFrame,
|
||||
geoid_field: str = "GEOID10",
|
||||
geometry_field: str = "geometry",
|
||||
state_code_field: str = "STATEFP10",
|
||||
county_code_field: str = "COUNTYFP10",
|
||||
) -> gpd.GeoDataFrame:
|
||||
usa_geo_df[geoid_field] = (
|
||||
usa_geo_df[geoid_field].astype(str).str.zfill(11)
|
||||
)
|
||||
return gpd.GeoDataFrame(
|
||||
df.merge(
|
||||
usa_geo_df[
|
||||
[
|
||||
geoid_field,
|
||||
geometry_field,
|
||||
state_code_field,
|
||||
county_code_field,
|
||||
]
|
||||
],
|
||||
left_on=[self.GEOID_TRACT_FIELD_NAME],
|
||||
right_on=[geoid_field],
|
||||
)
|
||||
)
|
||||
|
||||
def extract(self) -> None:
|
||||
# Define the variables to retrieve
|
||||
variables = (
|
||||
|
@ -227,6 +292,27 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
|
||||
df = self.df
|
||||
|
||||
# Here we join the geometry of the US to the dataframe so that we can impute
|
||||
# The income of neighbors. first this looks locally; if there's no local
|
||||
# geojson file for all of the US, this will read it off of S3
|
||||
logger.info("Reading in geojson for the country")
|
||||
if not os.path.exists(
|
||||
self.DATA_PATH / "census" / "geojson" / "us.json"
|
||||
):
|
||||
logger.info("Fetching Census data from AWS S3")
|
||||
unzip_file_from_url(
|
||||
CENSUS_DATA_S3_URL,
|
||||
self.DATA_PATH / "tmp",
|
||||
self.DATA_PATH,
|
||||
)
|
||||
|
||||
geo_df = gpd.read_file(
|
||||
self.DATA_PATH / "census" / "geojson" / "us.json"
|
||||
)
|
||||
df = self._merge_geojson(
|
||||
df=df,
|
||||
usa_geo_df=geo_df,
|
||||
)
|
||||
# Rename two fields.
|
||||
df = df.rename(
|
||||
columns={
|
||||
|
@ -349,7 +435,7 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
df["B03003_003E"] / df["B03003_001E"]
|
||||
)
|
||||
|
||||
# Calculate college attendance:
|
||||
# Calculate college attendance and adjust low income
|
||||
df[self.COLLEGE_ATTENDANCE_FIELD] = (
|
||||
df[self.COLLEGE_ATTENDANCE_MALE_ENROLLED_PUBLIC]
|
||||
+ df[self.COLLEGE_ATTENDANCE_MALE_ENROLLED_PRIVATE]
|
||||
|
@ -361,22 +447,64 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
1 - df[self.COLLEGE_ATTENDANCE_FIELD]
|
||||
)
|
||||
|
||||
# strip columns
|
||||
df = df[self.COLUMNS_TO_KEEP]
|
||||
|
||||
# Save results to self.
|
||||
self.df = df
|
||||
|
||||
# rename columns to be used in score
|
||||
rename_fields = {
|
||||
"Percent of individuals < 200% Federal Poverty Line": field_names.POVERTY_LESS_THAN_200_FPL_FIELD,
|
||||
}
|
||||
self.df.rename(
|
||||
columns=rename_fields,
|
||||
inplace=True,
|
||||
errors="raise",
|
||||
# we impute income for both income measures
|
||||
## TODO: Convert to pydantic for clarity
|
||||
logger.info("Imputing income information")
|
||||
ImputeVariables = namedtuple(
|
||||
"ImputeVariables", ["raw_field_name", "imputed_field_name"]
|
||||
)
|
||||
|
||||
df = calculate_income_measures(
|
||||
impute_var_named_tup_list=[
|
||||
ImputeVariables(
|
||||
raw_field_name=self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME,
|
||||
imputed_field_name=self.IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME,
|
||||
),
|
||||
ImputeVariables(
|
||||
raw_field_name=self.COLLEGE_ATTENDANCE_FIELD,
|
||||
imputed_field_name=self.IMPUTED_COLLEGE_ATTENDANCE_FIELD,
|
||||
),
|
||||
],
|
||||
geo_df=df,
|
||||
geoid_field=self.GEOID_TRACT_FIELD_NAME,
|
||||
)
|
||||
|
||||
logger.info("Calculating with imputed values")
|
||||
|
||||
df[
|
||||
self.ADJUSTED_AND_IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME
|
||||
] = (
|
||||
df[self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME].fillna(
|
||||
df[self.IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME]
|
||||
)
|
||||
- df[self.COLLEGE_ATTENDANCE_FIELD].fillna(
|
||||
df[self.IMPUTED_COLLEGE_ATTENDANCE_FIELD]
|
||||
)
|
||||
).clip(
|
||||
lower=0
|
||||
)
|
||||
|
||||
# All values should have a value at this point
|
||||
assert (
|
||||
df[
|
||||
self.ADJUSTED_AND_IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME
|
||||
]
|
||||
.isna()
|
||||
.sum()
|
||||
== 0
|
||||
), "Error: not all values were filled..."
|
||||
|
||||
logger.info("Renaming columns...")
|
||||
df = df.rename(
|
||||
columns={
|
||||
self.ADJUSTED_AND_IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME: field_names.POVERTY_LESS_THAN_200_FPL_IMPUTED_FIELD,
|
||||
self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME: field_names.POVERTY_LESS_THAN_200_FPL_FIELD,
|
||||
}
|
||||
)
|
||||
|
||||
# Strip columns and save results to self.
|
||||
self.df = df[self.COLUMNS_TO_KEEP]
|
||||
|
||||
def load(self) -> None:
|
||||
logger.info("Saving Census ACS Data")
|
||||
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
from typing import List, NamedTuple
|
||||
import pandas as pd
|
||||
import geopandas as gpd
|
||||
import numpy as np
|
||||
|
||||
from data_pipeline.utils import get_module_logger
|
||||
|
||||
logger = get_module_logger(__name__)
|
||||
|
||||
|
||||
def _get_fips_mask(
|
||||
geo_df: gpd.GeoDataFrame,
|
||||
row: gpd.GeoSeries,
|
||||
fips_digits: int,
|
||||
geoid_field: str = "GEOID10_TRACT",
|
||||
) -> pd.Series:
|
||||
return (
|
||||
geo_df[geoid_field].str[:fips_digits] == row[geoid_field][:fips_digits]
|
||||
)
|
||||
|
||||
|
||||
def _get_neighbor_mask(
|
||||
geo_df: gpd.GeoDataFrame, row: gpd.GeoSeries
|
||||
) -> pd.Series:
|
||||
return geo_df["geometry"].touches(row["geometry"])
|
||||
|
||||
|
||||
def _choose_best_mask(
|
||||
geo_df: gpd.GeoDataFrame,
|
||||
masks_in_priority_order: List[pd.Series],
|
||||
column_to_impute: str,
|
||||
) -> pd.Series:
|
||||
for mask in masks_in_priority_order:
|
||||
if any(geo_df[mask][column_to_impute].notna()):
|
||||
return mask
|
||||
raise Exception("No mask found")
|
||||
|
||||
|
||||
def _prepare_dataframe_for_imputation(
|
||||
impute_var_named_tup_list: List[NamedTuple],
|
||||
geo_df: gpd.GeoDataFrame,
|
||||
geoid_field: str = "GEOID10_TRACT",
|
||||
) -> tuple[list, gpd.GeoDataFrame]:
|
||||
imputing_cols = [
|
||||
impute_var_pair.raw_field_name
|
||||
for impute_var_pair in impute_var_named_tup_list
|
||||
]
|
||||
|
||||
# prime column to exist
|
||||
for impute_var_pair in impute_var_named_tup_list:
|
||||
geo_df[impute_var_pair.imputed_field_name] = geo_df[
|
||||
impute_var_pair.raw_field_name
|
||||
].copy()
|
||||
|
||||
# generate a list of tracts for which at least one of the imputation
|
||||
# columns is null
|
||||
tract_list = geo_df[geo_df[imputing_cols].isna().any(axis=1)][
|
||||
geoid_field
|
||||
].unique()
|
||||
|
||||
# Check that imputation is a valid choice for this set of fields
|
||||
logger.info(f"Imputing values for {len(tract_list)} unique tracts.")
|
||||
assert len(tract_list) > 0, "Error: No missing values to impute"
|
||||
|
||||
return tract_list, geo_df
|
||||
|
||||
|
||||
def calculate_income_measures(
|
||||
impute_var_named_tup_list: list,
|
||||
geo_df: gpd.GeoDataFrame,
|
||||
geoid_field: str,
|
||||
) -> pd.DataFrame:
|
||||
"""Impute values based on geographic neighbors
|
||||
|
||||
We only want to check neighbors a single time, so all variables
|
||||
that we impute get imputed here.
|
||||
|
||||
Takes in:
|
||||
required:
|
||||
impute_var_named_tup_list: list of named tuples (imputed field, raw field)
|
||||
geo_df: geo dataframe that already has the census shapefiles merged
|
||||
geoid field: tract level ID
|
||||
|
||||
Returns: non-geometry pd.DataFrame
|
||||
"""
|
||||
# Determine where to impute variables and fill a column with nulls
|
||||
tract_list, geo_df = _prepare_dataframe_for_imputation(
|
||||
impute_var_named_tup_list=impute_var_named_tup_list,
|
||||
geo_df=geo_df,
|
||||
geoid_field=geoid_field,
|
||||
)
|
||||
|
||||
# Iterate through the dataframe to impute in place
|
||||
for index, row in geo_df.iterrows():
|
||||
if row[geoid_field] in tract_list:
|
||||
neighbor_mask = _get_neighbor_mask(geo_df, row)
|
||||
county_mask = _get_fips_mask(
|
||||
geo_df=geo_df, row=row, fips_digits=5, geoid_field=geoid_field
|
||||
)
|
||||
state_mask = _get_fips_mask(
|
||||
geo_df=geo_df, row=row, fips_digits=2, geoid_field=geoid_field
|
||||
)
|
||||
|
||||
# Impute fields for every row missing at least one value using the best possible set of neighbors
|
||||
# Note that later, we will pull raw.fillna(imputed), so the mechanics of this step aren't critical
|
||||
for impute_var_pair in impute_var_named_tup_list:
|
||||
mask_to_use = _choose_best_mask(
|
||||
geo_df=geo_df,
|
||||
masks_in_priority_order=[
|
||||
neighbor_mask,
|
||||
county_mask,
|
||||
state_mask,
|
||||
],
|
||||
column_to_impute=impute_var_pair.raw_field_name,
|
||||
)
|
||||
geo_df.loc[index, impute_var_pair.imputed_field_name] = geo_df[
|
||||
mask_to_use
|
||||
][impute_var_pair.raw_field_name].mean()
|
||||
|
||||
logger.info("Casting geodataframe as a typical dataframe")
|
||||
# get rid of the geometry column and cast as a typical df
|
||||
df = pd.DataFrame(
|
||||
geo_df[[col for col in geo_df.columns if col != "geometry"]]
|
||||
)
|
||||
|
||||
# finally, return the df
|
||||
return df
|
|
@ -4,6 +4,7 @@ from typing import List
|
|||
import censusdata
|
||||
import pandas as pd
|
||||
|
||||
|
||||
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
|
||||
from data_pipeline.utils import get_module_logger
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue