diff --git a/data/data-pipeline/data_pipeline/etl/base.py b/data/data-pipeline/data_pipeline/etl/base.py index ca636fa9..044fe0c8 100644 --- a/data/data-pipeline/data_pipeline/etl/base.py +++ b/data/data-pipeline/data_pipeline/etl/base.py @@ -34,7 +34,8 @@ class ExtractTransformLoad: GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT" # TODO: investigate. Census says there are only 217,740 CBGs in the US. This might be from CBGs at different time periods. EXPECTED_MAX_CENSUS_BLOCK_GROUPS: int = 250000 - EXPECTED_MAX_CENSUS_TRACTS: int = 73076 + # TODO: investigate. Census says there are only 73,057 tracts in the US. This might be from tracts at different time periods. + EXPECTED_MAX_CENSUS_TRACTS: int = 74027 def __init__(self, config_path: Path) -> None: """Inits the class with instance specific variables""" diff --git a/data/data-pipeline/data_pipeline/etl/constants.py b/data/data-pipeline/data_pipeline/etl/constants.py index bc2848ba..831d1e18 100644 --- a/data/data-pipeline/data_pipeline/etl/constants.py +++ b/data/data-pipeline/data_pipeline/etl/constants.py @@ -1,9 +1,4 @@ DATASET_LIST = [ - { - "name": "tree_equity_score", - "module_dir": "tree_equity_score", - "class_name": "TreeEquityScoreETL", - }, { "name": "census_acs", "module_dir": "census_acs", @@ -14,11 +9,6 @@ DATASET_LIST = [ "module_dir": "ejscreen", "class_name": "EJSCREENETL", }, - { - "name": "housing_and_transportation", - "module_dir": "housing_and_transportation", - "class_name": "HousingTransportationETL", - }, { "name": "hud_housing", "module_dir": "hud_housing", @@ -79,6 +69,16 @@ DATASET_LIST = [ "module_dir": "census_decennial", "class_name": "CensusDecennialETL", }, + { + "name": "housing_and_transportation", + "module_dir": "housing_and_transportation", + "class_name": "HousingTransportationETL", + }, + { + "name": "tree_equity_score", + "module_dir": "tree_equity_score", + "class_name": "TreeEquityScoreETL", + }, ] CENSUS_INFO = { "name": "census", diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index 97d066dc..a98b7b61 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -19,7 +19,6 @@ class ScoreETL(ExtractTransformLoad): self.df: pd.DataFrame self.ejscreen_df: pd.DataFrame self.census_df: pd.DataFrame - self.housing_and_transportation_df: pd.DataFrame self.hud_housing_df: pd.DataFrame self.cdc_places_df: pd.DataFrame self.census_acs_median_incomes_df: pd.DataFrame @@ -41,29 +40,6 @@ class ScoreETL(ExtractTransformLoad): dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, low_memory=False, ) - # TODO move to EJScreen ETL - self.ejscreen_df.rename( - columns={ - "ACSTOTPOP": field_names.TOTAL_POP_FIELD, - "CANCER": field_names.AIR_TOXICS_CANCER_RISK_FIELD, - "RESP": field_names.RESPITORY_HAZARD_FIELD, - "DSLPM": field_names.DIESEL_FIELD, - "PM25": field_names.PM25_FIELD, - "OZONE": field_names.OZONE_FIELD, - "PTRAF": field_names.TRAFFIC_FIELD, - "PRMP": field_names.RMP_FIELD, - "PTSDF": field_names.TSDF_FIELD, - "PNPL": field_names.NPL_FIELD, - "PWDIS": field_names.WASTEWATER_FIELD, - "LINGISOPCT": field_names.HOUSEHOLDS_LINGUISTIC_ISO_FIELD, - "LOWINCPCT": field_names.POVERTY_FIELD, - "LESSHSPCT": field_names.HIGH_SCHOOL_ED_FIELD, - "OVER64PCT": field_names.OVER_64_FIELD, - "UNDER5PCT": field_names.UNDER_5_FIELD, - "PRE1960PCT": field_names.LEAD_PAINT_FIELD, - }, - inplace=True, - ) # Load census data census_csv = ( @@ -75,23 +51,6 @@ class ScoreETL(ExtractTransformLoad): low_memory=False, ) - # Load housing and transportation data - housing_and_transportation_index_csv = ( - constants.DATA_PATH - / "dataset" - / "housing_and_transportation_index" - / "usa.csv" - ) - self.housing_and_transportation_df = pd.read_csv( - housing_and_transportation_index_csv, - dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, - low_memory=False, - ) - # TODO move to HT Index ETL - self.housing_and_transportation_df.rename( - columns={"ht_ami": field_names.HT_INDEX_FIELD}, inplace=True - ) - # Load HUD housing data hud_housing_csv = ( constants.DATA_PATH / "dataset" / "hud_housing" / "usa.csv" @@ -180,13 +139,32 @@ class ScoreETL(ExtractTransformLoad): def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame: logger.info("Joining Census Tract dataframes") + + def merge_function( + left: pd.DataFrame, right: pd.DataFrame + ) -> pd.DataFrame: + """This is a custom function that merges two dataframes. + + It provides some logging as additional helpful context for error handling. + """ + try: + df = pd.merge( + left=left, + right=right, + on=self.GEOID_TRACT_FIELD_NAME, + how="outer", + ) + except Exception as e: + # Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope. + logger.warning( + f"Exception encountered while merging dataframe `right` that has the following columns: {','.join(right.columns)}" + ) + raise e + + return df + census_tract_df = functools.reduce( - lambda left, right: pd.merge( - left=left, - right=right, - on=self.GEOID_TRACT_FIELD_NAME, - how="outer", - ), + merge_function, census_tract_dfs, ) @@ -200,6 +178,40 @@ class ScoreETL(ExtractTransformLoad): ) return census_tract_df + def _census_tract_df_sanity_check( + self, df_to_check: pd.DataFrame, df_name: str = None + ) -> None: + """Check an individual data frame for census tract data quality checks.""" + + # Note: it'd be nice to log the name of the dataframe directly, but that's not accessible in this scope. + dataframe_descriptor = ( + f"dataframe `{df_name}`" + if df_name + else f"the dataframe that has columns { ','.join(df_to_check.columns)}" + ) + + tract_values = ( + df_to_check[self.GEOID_TRACT_FIELD_NAME].str.len().unique() + ) + if any(tract_values != [11]): + raise ValueError( + f"Some of the census tract data has the wrong length: {tract_values} in {dataframe_descriptor}" + ) + + non_unique_tract_values = len( + df_to_check[self.GEOID_TRACT_FIELD_NAME] + ) - len(df_to_check[self.GEOID_TRACT_FIELD_NAME].unique()) + + if non_unique_tract_values > 0: + raise ValueError( + f"There are {non_unique_tract_values} duplicate tract IDs in {dataframe_descriptor}" + ) + + if len(df_to_check) > self.EXPECTED_MAX_CENSUS_TRACTS: + raise ValueError( + f"Too many rows in the join: {len(df_to_check)} in {dataframe_descriptor}" + ) + # TODO Move a lot of this to the ETL part of the pipeline def _prepare_initial_df(self) -> pd.DataFrame: logger.info("Preparing initial dataframe") @@ -214,20 +226,23 @@ class ScoreETL(ExtractTransformLoad): self.ejscreen_df, self.geocorr_urban_rural_df, self.persistent_poverty_df, - self.housing_and_transportation_df, self.national_risk_index_df, self.census_acs_median_incomes_df, ] + + # Sanity check each data frame before merging. + for df in census_tract_dfs: + self._census_tract_df_sanity_check(df_to_check=df) + census_tract_df = self._join_tract_dfs(census_tract_dfs) # If GEOID10s are read as numbers instead of strings, the initial 0 is dropped, # and then we get too many CBG rows (one for 012345 and one for 12345). - # TODO: Investigate how many rows we should have here - # if len(census_tract_df) > self.EXPECTED_MAX_CENSUS_TRACTS: - # raise ValueError( - # f"Too many rows in the join: {len(census_tract_df)}" - # ) + # Now sanity-check the merged df. + self._census_tract_df_sanity_check( + df_to_check=census_tract_df, df_name="census_tract_df" + ) # Calculate median income variables. # First, calculate the income of the block group as a fraction of the state income. @@ -280,7 +295,6 @@ class ScoreETL(ExtractTransformLoad): field_names.POVERTY_FIELD, field_names.HIGH_SCHOOL_ED_FIELD, field_names.UNEMPLOYMENT_FIELD, - field_names.HT_INDEX_FIELD, field_names.MEDIAN_HOUSE_VALUE_FIELD, field_names.EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME, field_names.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME, diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_decennial/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census_decennial/etl.py index ee563391..6190beea 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census_decennial/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census_decennial/etl.py @@ -220,9 +220,7 @@ class CensusDecennialETL(ExtractTransformLoad): # Creating Geo ID (Census Block Group) Field Name self.df_all[self.GEOID_TRACT_FIELD_NAME] = ( - self.df_all["state"] - + self.df_all["county"] - + self.df_all["tract"] + self.df_all["state"] + self.df_all["county"] + self.df_all["tract"] ) # Reporting Missing Values diff --git a/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py b/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py index c8f50172..2626cbd5 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py @@ -1,6 +1,7 @@ import pandas as pd from data_pipeline.etl.base import ExtractTransformLoad +from data_pipeline.score import field_names from data_pipeline.utils import get_module_logger logger = get_module_logger(__name__) @@ -35,6 +36,25 @@ class EJSCREENETL(ExtractTransformLoad): self.df.rename( columns={ "ID": self.GEOID_TRACT_FIELD_NAME, + # Note: it is currently unorthodox to use `field_names` in an ETL class, + # but I think that's the direction we'd like to move all ETL classes. - LMB + "ACSTOTPOP": field_names.TOTAL_POP_FIELD, + "CANCER": field_names.AIR_TOXICS_CANCER_RISK_FIELD, + "RESP": field_names.RESPITORY_HAZARD_FIELD, + "DSLPM": field_names.DIESEL_FIELD, + "PM25": field_names.PM25_FIELD, + "OZONE": field_names.OZONE_FIELD, + "PTRAF": field_names.TRAFFIC_FIELD, + "PRMP": field_names.RMP_FIELD, + "PTSDF": field_names.TSDF_FIELD, + "PNPL": field_names.NPL_FIELD, + "PWDIS": field_names.WASTEWATER_FIELD, + "LINGISOPCT": field_names.HOUSEHOLDS_LINGUISTIC_ISO_FIELD, + "LOWINCPCT": field_names.POVERTY_FIELD, + "LESSHSPCT": field_names.HIGH_SCHOOL_ED_FIELD, + "OVER64PCT": field_names.OVER_64_FIELD, + "UNDER5PCT": field_names.UNDER_5_FIELD, + "PRE1960PCT": field_names.LEAD_PAINT_FIELD, }, inplace=True, ) diff --git a/data/data-pipeline/data_pipeline/etl/sources/housing_and_transportation/etl.py b/data/data-pipeline/data_pipeline/etl/sources/housing_and_transportation/etl.py index dfba2a30..6bd2368e 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/housing_and_transportation/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/housing_and_transportation/etl.py @@ -51,7 +51,9 @@ class HousingTransportationETL(ExtractTransformLoad): logger.info("Transforming Housing and Transportation Data") # Rename and reformat tract ID - self.df.rename(columns={"tract": self.GEOID_TRACT_FIELD_NAME}, inplace=True) + self.df.rename( + columns={"tract": self.GEOID_TRACT_FIELD_NAME}, inplace=True + ) self.df[self.GEOID_TRACT_FIELD_NAME] = self.df[ self.GEOID_TRACT_FIELD_NAME ].str.replace('"', "") diff --git a/data/data-pipeline/data_pipeline/score/score_c.py b/data/data-pipeline/data_pipeline/score/score_c.py index 90f091aa..9fe68fff 100644 --- a/data/data-pipeline/data_pipeline/score/score_c.py +++ b/data/data-pipeline/data_pipeline/score/score_c.py @@ -24,7 +24,7 @@ class ScoreC(Score): + field_names.PERCENTILE_FIELD_SUFFIX, field_names.UNEMPLOYMENT_FIELD + field_names.PERCENTILE_FIELD_SUFFIX, - field_names.HT_INDEX_FIELD + field_names.HOUSING_BURDEN_FIELD + field_names.PERCENTILE_FIELD_SUFFIX, ], )