mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-07-25 07:20:18 -07:00
Issue 1910: Do not impute income for 0 population tracts (#1918)
* should be working, has unnecessary loggers * removing loggers and cleaning up * updating ejscreen tests * adding tests and responding to PR feedback * fixing broken smoke test * delete smoketest docs
This commit is contained in:
parent
9e85375d9b
commit
9fb9874a15
13 changed files with 150 additions and 75 deletions
|
@ -365,6 +365,9 @@ class ExtractTransformLoad:
|
|||
f"No file found at `{output_file_path}`."
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Reading in CSV `{output_file_path}` for ETL of class `{cls}`."
|
||||
)
|
||||
output_df = pd.read_csv(
|
||||
output_file_path,
|
||||
dtype={
|
||||
|
|
|
@ -5,6 +5,7 @@ import numpy as np
|
|||
import pandas as pd
|
||||
|
||||
from data_pipeline.etl.base import ExtractTransformLoad
|
||||
from data_pipeline.etl.sources.census_acs.etl import CensusACSETL
|
||||
from data_pipeline.etl.sources.national_risk_index.etl import (
|
||||
NationalRiskIndexETL,
|
||||
)
|
||||
|
@ -35,7 +36,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
# dataframes
|
||||
self.df: pd.DataFrame
|
||||
self.ejscreen_df: pd.DataFrame
|
||||
self.census_df: pd.DataFrame
|
||||
self.census_acs_df: pd.DataFrame
|
||||
self.hud_housing_df: pd.DataFrame
|
||||
self.cdc_places_df: pd.DataFrame
|
||||
self.census_acs_median_incomes_df: pd.DataFrame
|
||||
|
@ -67,14 +68,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
)
|
||||
|
||||
# Load census data
|
||||
census_csv = (
|
||||
constants.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
|
||||
)
|
||||
self.census_df = pd.read_csv(
|
||||
census_csv,
|
||||
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
self.census_acs_df = CensusACSETL.get_data_frame()
|
||||
|
||||
# Load HUD housing data
|
||||
hud_housing_csv = (
|
||||
|
@ -346,7 +340,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
|
||||
# Join all the data sources that use census tracts
|
||||
census_tract_dfs = [
|
||||
self.census_df,
|
||||
self.census_acs_df,
|
||||
self.hud_housing_df,
|
||||
self.cdc_places_df,
|
||||
self.cdc_life_expectancy_df,
|
||||
|
@ -364,7 +358,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
self.nature_deprived_df,
|
||||
self.eamlis_df,
|
||||
self.fuds_df,
|
||||
self.tribal_overlap_df
|
||||
self.tribal_overlap_df,
|
||||
]
|
||||
|
||||
# Sanity check each data frame before merging.
|
||||
|
|
|
@ -73,8 +73,7 @@ class CDCLifeExpectancy(ExtractTransformLoad):
|
|||
|
||||
all_usa_raw_df = self._download_and_prep_data(
|
||||
file_url=self.USA_FILE_URL,
|
||||
download_file_name=self.get_tmp_path()
|
||||
/ "US_A.CSV",
|
||||
download_file_name=self.get_tmp_path() / "US_A.CSV",
|
||||
)
|
||||
|
||||
# Check which states are missing
|
||||
|
@ -94,15 +93,13 @@ class CDCLifeExpectancy(ExtractTransformLoad):
|
|||
logger.info("Downloading data for Maine")
|
||||
maine_raw_df = self._download_and_prep_data(
|
||||
file_url=self.MAINE_FILE_URL,
|
||||
download_file_name=self.get_tmp_path()
|
||||
/ "maine.csv",
|
||||
download_file_name=self.get_tmp_path() / "maine.csv",
|
||||
)
|
||||
|
||||
logger.info("Downloading data for Wisconsin")
|
||||
wisconsin_raw_df = self._download_and_prep_data(
|
||||
file_url=self.WISCONSIN_FILE_URL,
|
||||
download_file_name=self.get_tmp_path()
|
||||
/ "wisconsin.csv",
|
||||
download_file_name=self.get_tmp_path() / "wisconsin.csv",
|
||||
)
|
||||
|
||||
combined_df = pd.concat(
|
||||
|
|
|
@ -23,12 +23,11 @@ CENSUS_DATA_S3_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/census.zip"
|
|||
|
||||
|
||||
class CensusACSETL(ExtractTransformLoad):
|
||||
def __init__(self):
|
||||
self.ACS_YEAR = 2019
|
||||
self.OUTPUT_PATH = (
|
||||
self.DATA_PATH / "dataset" / f"census_acs_{self.ACS_YEAR}"
|
||||
)
|
||||
NAME = "census_acs"
|
||||
ACS_YEAR = 2019
|
||||
MINIMUM_POPULATION_REQUIRED_FOR_IMPUTATION = 1
|
||||
|
||||
def __init__(self):
|
||||
self.TOTAL_UNEMPLOYED_FIELD = "B23025_005E"
|
||||
self.TOTAL_IN_LABOR_FORCE = "B23025_003E"
|
||||
self.EMPLOYMENT_FIELDS = [
|
||||
|
@ -216,8 +215,15 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
self.OTHER_RACE_FIELD_NAME,
|
||||
]
|
||||
|
||||
# Note: this field does double-duty here. It's used as the total population
|
||||
# within the age questions.
|
||||
# It's also what EJScreen used as their variable for total population in the
|
||||
# census tract, so we use it similarly.
|
||||
# See p. 83 of https://www.epa.gov/sites/default/files/2021-04/documents/ejscreen_technical_document.pdf
|
||||
self.TOTAL_POPULATION_FROM_AGE_TABLE = "B01001_001E" # Estimate!!Total:
|
||||
|
||||
self.AGE_INPUT_FIELDS = [
|
||||
"B01001_001E", # Estimate!!Total:
|
||||
self.TOTAL_POPULATION_FROM_AGE_TABLE,
|
||||
"B01001_003E", # Estimate!!Total:!!Male:!!Under 5 years
|
||||
"B01001_004E", # Estimate!!Total:!!Male:!!5 to 9 years
|
||||
"B01001_005E", # Estimate!!Total:!!Male:!!10 to 14 years
|
||||
|
@ -277,6 +283,7 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
self.COLUMNS_TO_KEEP = (
|
||||
[
|
||||
self.GEOID_TRACT_FIELD_NAME,
|
||||
field_names.TOTAL_POP_FIELD,
|
||||
self.UNEMPLOYED_FIELD_NAME,
|
||||
self.LINGUISTIC_ISOLATION_FIELD_NAME,
|
||||
self.MEDIAN_INCOME_FIELD_NAME,
|
||||
|
@ -375,18 +382,22 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
)
|
||||
|
||||
geo_df = gpd.read_file(
|
||||
self.DATA_PATH / "census" / "geojson" / "us.json"
|
||||
self.DATA_PATH / "census" / "geojson" / "us.json",
|
||||
)
|
||||
|
||||
df = self._merge_geojson(
|
||||
df=df,
|
||||
usa_geo_df=geo_df,
|
||||
)
|
||||
# Rename two fields.
|
||||
|
||||
# Rename some fields.
|
||||
df = df.rename(
|
||||
columns={
|
||||
self.MEDIAN_HOUSE_VALUE_FIELD: self.MEDIAN_HOUSE_VALUE_FIELD_NAME,
|
||||
self.MEDIAN_INCOME_FIELD: self.MEDIAN_INCOME_FIELD_NAME,
|
||||
}
|
||||
self.TOTAL_POPULATION_FROM_AGE_TABLE: field_names.TOTAL_POP_FIELD,
|
||||
},
|
||||
errors="raise",
|
||||
)
|
||||
|
||||
# Handle null values for various fields, which are `-666666666`.
|
||||
|
@ -472,7 +483,6 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
)
|
||||
|
||||
# Calculate some demographic information.
|
||||
|
||||
df = df.rename(
|
||||
columns={
|
||||
"B02001_003E": self.BLACK_FIELD_NAME,
|
||||
|
@ -560,14 +570,11 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
),
|
||||
]
|
||||
|
||||
# Calculate age groups
|
||||
total_population_age_series = df["B01001_001E"]
|
||||
|
||||
# For each age bucket, sum the relevant columns and calculate the total
|
||||
# percentage.
|
||||
for age_bucket, sum_columns in age_bucket_and_its_sum_columns:
|
||||
df[age_bucket] = (
|
||||
df[sum_columns].sum(axis=1) / total_population_age_series
|
||||
df[sum_columns].sum(axis=1) / df[field_names.TOTAL_POP_FIELD]
|
||||
)
|
||||
|
||||
# Calculate college attendance and adjust low income
|
||||
|
@ -602,6 +609,7 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
],
|
||||
geo_df=df,
|
||||
geoid_field=self.GEOID_TRACT_FIELD_NAME,
|
||||
minimum_population_required_for_imputation=self.MINIMUM_POPULATION_REQUIRED_FOR_IMPUTATION,
|
||||
)
|
||||
|
||||
logger.info("Calculating with imputed values")
|
||||
|
@ -615,13 +623,20 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
- df[self.COLLEGE_ATTENDANCE_FIELD].fillna(
|
||||
df[self.IMPUTED_COLLEGE_ATTENDANCE_FIELD]
|
||||
)
|
||||
# Use clip to ensure that the values are not negative if college attendance
|
||||
# is very high
|
||||
).clip(
|
||||
lower=0
|
||||
)
|
||||
|
||||
# All values should have a value at this point
|
||||
assert (
|
||||
# For tracts with >0 population
|
||||
df[
|
||||
df[field_names.TOTAL_POP_FIELD]
|
||||
>= self.MINIMUM_POPULATION_REQUIRED_FOR_IMPUTATION
|
||||
][
|
||||
# Then the imputed field should have no nulls
|
||||
self.ADJUSTED_AND_IMPUTED_POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME
|
||||
]
|
||||
.isna()
|
||||
|
@ -644,13 +659,5 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
& df[field_names.POVERTY_LESS_THAN_200_FPL_FIELD].isna()
|
||||
)
|
||||
|
||||
# Strip columns and save results to self.
|
||||
self.df = df[self.COLUMNS_TO_KEEP]
|
||||
|
||||
def load(self) -> None:
|
||||
logger.info("Saving Census ACS Data")
|
||||
|
||||
# mkdir census
|
||||
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.df.to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)
|
||||
# Save results to self.
|
||||
self.output_df = df
|
||||
|
|
|
@ -2,6 +2,7 @@ from typing import Any, List, NamedTuple, Tuple
|
|||
import pandas as pd
|
||||
import geopandas as gpd
|
||||
|
||||
from data_pipeline.score import field_names
|
||||
from data_pipeline.utils import get_module_logger
|
||||
|
||||
# pylint: disable=unsubscriptable-object
|
||||
|
@ -23,6 +24,7 @@ def _get_fips_mask(
|
|||
def _get_neighbor_mask(
|
||||
geo_df: gpd.GeoDataFrame, row: gpd.GeoSeries
|
||||
) -> pd.Series:
|
||||
"""Returns neighboring tracts."""
|
||||
return geo_df["geometry"].touches(row["geometry"])
|
||||
|
||||
|
||||
|
@ -40,24 +42,47 @@ def _choose_best_mask(
|
|||
def _prepare_dataframe_for_imputation(
|
||||
impute_var_named_tup_list: List[NamedTuple],
|
||||
geo_df: gpd.GeoDataFrame,
|
||||
population_field: str,
|
||||
minimum_population_required_for_imputation: int = 1,
|
||||
geoid_field: str = "GEOID10_TRACT",
|
||||
) -> Tuple[Any, gpd.GeoDataFrame]:
|
||||
"""Helper for imputation.
|
||||
|
||||
Given the inputs of `ImputeVariables`, returns list of tracts that need to be
|
||||
imputed, along with a GeoDataFrame that has a column with the imputed field
|
||||
"primed", meaning it is a copy of the raw field.
|
||||
|
||||
Will drop any rows with population less than
|
||||
`minimum_population_required_for_imputation`.
|
||||
"""
|
||||
imputing_cols = [
|
||||
impute_var_pair.raw_field_name
|
||||
for impute_var_pair in impute_var_named_tup_list
|
||||
]
|
||||
|
||||
# prime column to exist
|
||||
# Prime column to exist
|
||||
for impute_var_pair in impute_var_named_tup_list:
|
||||
geo_df[impute_var_pair.imputed_field_name] = geo_df[
|
||||
impute_var_pair.raw_field_name
|
||||
].copy()
|
||||
|
||||
# generate a list of tracts for which at least one of the imputation
|
||||
# columns is null
|
||||
tract_list = geo_df[geo_df[imputing_cols].isna().any(axis=1)][
|
||||
geoid_field
|
||||
].unique()
|
||||
# Generate a list of tracts for which at least one of the imputation
|
||||
# columns is null that also meets population criteria.
|
||||
tract_list = geo_df[
|
||||
(
|
||||
# First, check whether any of the columns we want to impute contain null
|
||||
# values
|
||||
geo_df[imputing_cols].isna().any(axis=1)
|
||||
# Second, ensure population is either null or >= the minimum population
|
||||
& (
|
||||
geo_df[population_field].isnull()
|
||||
| (
|
||||
geo_df[population_field]
|
||||
>= minimum_population_required_for_imputation
|
||||
)
|
||||
)
|
||||
)
|
||||
][geoid_field].unique()
|
||||
|
||||
# Check that imputation is a valid choice for this set of fields
|
||||
logger.info(f"Imputing values for {len(tract_list)} unique tracts.")
|
||||
|
@ -70,6 +95,8 @@ def calculate_income_measures(
|
|||
impute_var_named_tup_list: list,
|
||||
geo_df: gpd.GeoDataFrame,
|
||||
geoid_field: str,
|
||||
population_field: str = field_names.TOTAL_POP_FIELD,
|
||||
minimum_population_required_for_imputation: int = 1,
|
||||
) -> pd.DataFrame:
|
||||
"""Impute values based on geographic neighbors
|
||||
|
||||
|
@ -89,6 +116,8 @@ def calculate_income_measures(
|
|||
impute_var_named_tup_list=impute_var_named_tup_list,
|
||||
geo_df=geo_df,
|
||||
geoid_field=geoid_field,
|
||||
population_field=population_field,
|
||||
minimum_population_required_for_imputation=minimum_population_required_for_imputation,
|
||||
)
|
||||
|
||||
# Iterate through the dataframe to impute in place
|
||||
|
@ -119,6 +148,7 @@ def calculate_income_measures(
|
|||
],
|
||||
column_to_impute=impute_var_pair.raw_field_name,
|
||||
)
|
||||
|
||||
geo_df.loc[index, impute_var_pair.imputed_field_name] = geo_df[
|
||||
mask_to_use
|
||||
][impute_var_pair.raw_field_name].mean()
|
||||
|
|
|
@ -24,7 +24,6 @@ class EJSCREENETL(ExtractTransformLoad):
|
|||
|
||||
self.COLUMNS_TO_KEEP = [
|
||||
self.GEOID_TRACT_FIELD_NAME,
|
||||
field_names.TOTAL_POP_FIELD,
|
||||
# pylint: disable=duplicate-code
|
||||
field_names.AIR_TOXICS_CANCER_RISK_FIELD,
|
||||
field_names.RESPIRATORY_HAZARD_FIELD,
|
||||
|
@ -66,7 +65,6 @@ class EJSCREENETL(ExtractTransformLoad):
|
|||
self.output_df = self.df.rename(
|
||||
columns={
|
||||
self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME,
|
||||
"ACSTOTPOP": field_names.TOTAL_POP_FIELD,
|
||||
"CANCER": field_names.AIR_TOXICS_CANCER_RISK_FIELD,
|
||||
"RESP": field_names.RESPIRATORY_HAZARD_FIELD,
|
||||
"DSLPM": field_names.DIESEL_FIELD,
|
||||
|
|
|
@ -108,8 +108,12 @@ class TribalOverlapETL(ExtractTransformLoad):
|
|||
|
||||
# Switch from geographic to projected CRSes
|
||||
# because logically that's right
|
||||
self.census_tract_gdf = self.census_tract_gdf.to_crs(crs=self.CRS_INTEGER)
|
||||
tribal_gdf_without_points = tribal_gdf_without_points.to_crs(crs=self.CRS_INTEGER)
|
||||
self.census_tract_gdf = self.census_tract_gdf.to_crs(
|
||||
crs=self.CRS_INTEGER
|
||||
)
|
||||
tribal_gdf_without_points = tribal_gdf_without_points.to_crs(
|
||||
crs=self.CRS_INTEGER
|
||||
)
|
||||
|
||||
# Create a measure for the entire census tract area
|
||||
self.census_tract_gdf["area_tract"] = self.census_tract_gdf.area
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue