Issue 919: Fix too many tracts issue (#922)

* Some cleanup, adding error warning to merge function

* Error handling around tract merge
This commit is contained in:
Lucas Merrill Brown 2021-11-24 16:47:57 -05:00 committed by lucasmbrown-usds
commit a4108d24c0
7 changed files with 105 additions and 70 deletions

View file

@ -19,7 +19,6 @@ class ScoreETL(ExtractTransformLoad):
self.df: pd.DataFrame
self.ejscreen_df: pd.DataFrame
self.census_df: pd.DataFrame
self.housing_and_transportation_df: pd.DataFrame
self.hud_housing_df: pd.DataFrame
self.cdc_places_df: pd.DataFrame
self.census_acs_median_incomes_df: pd.DataFrame
@ -41,29 +40,6 @@ class ScoreETL(ExtractTransformLoad):
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
# TODO move to EJScreen ETL
self.ejscreen_df.rename(
columns={
"ACSTOTPOP": field_names.TOTAL_POP_FIELD,
"CANCER": field_names.AIR_TOXICS_CANCER_RISK_FIELD,
"RESP": field_names.RESPITORY_HAZARD_FIELD,
"DSLPM": field_names.DIESEL_FIELD,
"PM25": field_names.PM25_FIELD,
"OZONE": field_names.OZONE_FIELD,
"PTRAF": field_names.TRAFFIC_FIELD,
"PRMP": field_names.RMP_FIELD,
"PTSDF": field_names.TSDF_FIELD,
"PNPL": field_names.NPL_FIELD,
"PWDIS": field_names.WASTEWATER_FIELD,
"LINGISOPCT": field_names.HOUSEHOLDS_LINGUISTIC_ISO_FIELD,
"LOWINCPCT": field_names.POVERTY_FIELD,
"LESSHSPCT": field_names.HIGH_SCHOOL_ED_FIELD,
"OVER64PCT": field_names.OVER_64_FIELD,
"UNDER5PCT": field_names.UNDER_5_FIELD,
"PRE1960PCT": field_names.LEAD_PAINT_FIELD,
},
inplace=True,
)
# Load census data
census_csv = (
@ -75,23 +51,6 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False,
)
# Load housing and transportation data
housing_and_transportation_index_csv = (
constants.DATA_PATH
/ "dataset"
/ "housing_and_transportation_index"
/ "usa.csv"
)
self.housing_and_transportation_df = pd.read_csv(
housing_and_transportation_index_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
# TODO move to HT Index ETL
self.housing_and_transportation_df.rename(
columns={"ht_ami": field_names.HT_INDEX_FIELD}, inplace=True
)
# Load HUD housing data
hud_housing_csv = (
constants.DATA_PATH / "dataset" / "hud_housing" / "usa.csv"
@ -180,13 +139,32 @@ class ScoreETL(ExtractTransformLoad):
def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame:
logger.info("Joining Census Tract dataframes")
def merge_function(
left: pd.DataFrame, right: pd.DataFrame
) -> pd.DataFrame:
"""This is a custom function that merges two dataframes.
It provides some logging as additional helpful context for error handling.
"""
try:
df = pd.merge(
left=left,
right=right,
on=self.GEOID_TRACT_FIELD_NAME,
how="outer",
)
except Exception as e:
# Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope.
logger.warning(
f"Exception encountered while merging dataframe `right` that has the following columns: {','.join(right.columns)}"
)
raise e
return df
census_tract_df = functools.reduce(
lambda left, right: pd.merge(
left=left,
right=right,
on=self.GEOID_TRACT_FIELD_NAME,
how="outer",
),
merge_function,
census_tract_dfs,
)
@ -200,6 +178,40 @@ class ScoreETL(ExtractTransformLoad):
)
return census_tract_df
def _census_tract_df_sanity_check(
self, df_to_check: pd.DataFrame, df_name: str = None
) -> None:
"""Check an individual data frame for census tract data quality checks."""
# Note: it'd be nice to log the name of the dataframe directly, but that's not accessible in this scope.
dataframe_descriptor = (
f"dataframe `{df_name}`"
if df_name
else f"the dataframe that has columns { ','.join(df_to_check.columns)}"
)
tract_values = (
df_to_check[self.GEOID_TRACT_FIELD_NAME].str.len().unique()
)
if any(tract_values != [11]):
raise ValueError(
f"Some of the census tract data has the wrong length: {tract_values} in {dataframe_descriptor}"
)
non_unique_tract_values = len(
df_to_check[self.GEOID_TRACT_FIELD_NAME]
) - len(df_to_check[self.GEOID_TRACT_FIELD_NAME].unique())
if non_unique_tract_values > 0:
raise ValueError(
f"There are {non_unique_tract_values} duplicate tract IDs in {dataframe_descriptor}"
)
if len(df_to_check) > self.EXPECTED_MAX_CENSUS_TRACTS:
raise ValueError(
f"Too many rows in the join: {len(df_to_check)} in {dataframe_descriptor}"
)
# TODO Move a lot of this to the ETL part of the pipeline
def _prepare_initial_df(self) -> pd.DataFrame:
logger.info("Preparing initial dataframe")
@ -214,20 +226,23 @@ class ScoreETL(ExtractTransformLoad):
self.ejscreen_df,
self.geocorr_urban_rural_df,
self.persistent_poverty_df,
self.housing_and_transportation_df,
self.national_risk_index_df,
self.census_acs_median_incomes_df,
]
# Sanity check each data frame before merging.
for df in census_tract_dfs:
self._census_tract_df_sanity_check(df_to_check=df)
census_tract_df = self._join_tract_dfs(census_tract_dfs)
# If GEOID10s are read as numbers instead of strings, the initial 0 is dropped,
# and then we get too many CBG rows (one for 012345 and one for 12345).
# TODO: Investigate how many rows we should have here
# if len(census_tract_df) > self.EXPECTED_MAX_CENSUS_TRACTS:
# raise ValueError(
# f"Too many rows in the join: {len(census_tract_df)}"
# )
# Now sanity-check the merged df.
self._census_tract_df_sanity_check(
df_to_check=census_tract_df, df_name="census_tract_df"
)
# Calculate median income variables.
# First, calculate the income of the block group as a fraction of the state income.
@ -280,7 +295,6 @@ class ScoreETL(ExtractTransformLoad):
field_names.POVERTY_FIELD,
field_names.HIGH_SCHOOL_ED_FIELD,
field_names.UNEMPLOYMENT_FIELD,
field_names.HT_INDEX_FIELD,
field_names.MEDIAN_HOUSE_VALUE_FIELD,
field_names.EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME,
field_names.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME,