diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index 57b7bc6d..97d066dc 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -37,12 +37,13 @@ class ScoreETL(ExtractTransformLoad): constants.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv" ) self.ejscreen_df = pd.read_csv( - ejscreen_csv, dtype={"ID": "string"}, low_memory=False + ejscreen_csv, + dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, + low_memory=False, ) # TODO move to EJScreen ETL self.ejscreen_df.rename( columns={ - "ID": self.GEOID_FIELD_NAME, "ACSTOTPOP": field_names.TOTAL_POP_FIELD, "CANCER": field_names.AIR_TOXICS_CANCER_RISK_FIELD, "RESP": field_names.RESPITORY_HAZARD_FIELD, @@ -70,7 +71,7 @@ class ScoreETL(ExtractTransformLoad): ) self.census_df = pd.read_csv( census_csv, - dtype={self.GEOID_FIELD_NAME: "string"}, + dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, low_memory=False, ) @@ -83,7 +84,7 @@ class ScoreETL(ExtractTransformLoad): ) self.housing_and_transportation_df = pd.read_csv( housing_and_transportation_index_csv, - dtype={self.GEOID_FIELD_NAME: "string"}, + dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, low_memory=False, ) # TODO move to HT Index ETL @@ -120,7 +121,7 @@ class ScoreETL(ExtractTransformLoad): ) self.census_acs_median_incomes_df = pd.read_csv( census_acs_median_incomes_csv, - dtype={self.GEOID_FIELD_NAME: "string"}, + dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, low_memory=False, ) @@ -153,7 +154,7 @@ class ScoreETL(ExtractTransformLoad): ) self.national_risk_index_df = pd.read_csv( national_risk_index_csv, - dtype={self.GEOID_FIELD_NAME: "string"}, + dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, low_memory=False, ) @@ -177,25 +178,6 @@ class ScoreETL(ExtractTransformLoad): low_memory=False, ) - def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame: - logger.info("Joining Census Block Group dataframes") - census_block_group_df = functools.reduce( - lambda left, right: pd.merge( - left=left, right=right, on=self.GEOID_FIELD_NAME, how="outer" - ), - census_block_group_dfs, - ) - - # Sanity check the join. - if ( - len(census_block_group_df[self.GEOID_FIELD_NAME].str.len().unique()) - != 1 - ): - raise ValueError( - f"One of the input CSVs uses {self.GEOID_FIELD_NAME} with a different length." - ) - return census_block_group_df - def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame: logger.info("Joining Census Tract dataframes") census_tract_df = functools.reduce( @@ -222,13 +204,6 @@ class ScoreETL(ExtractTransformLoad): def _prepare_initial_df(self) -> pd.DataFrame: logger.info("Preparing initial dataframe") - # Join all the data sources that use census block groups - census_block_group_dfs = [ - self.ejscreen_df, - ] - - census_block_group_df = self._join_cbg_dfs(census_block_group_dfs) - # Join all the data sources that use census tracts census_tract_dfs = [ self.census_df, @@ -236,6 +211,7 @@ class ScoreETL(ExtractTransformLoad): self.cdc_places_df, self.cdc_life_expectancy_df, self.doe_energy_burden_df, + self.ejscreen_df, self.geocorr_urban_rural_df, self.persistent_poverty_df, self.housing_and_transportation_df, @@ -244,24 +220,18 @@ class ScoreETL(ExtractTransformLoad): ] census_tract_df = self._join_tract_dfs(census_tract_dfs) - # Calculate the tract for the CBG data. - census_block_group_df[ - self.GEOID_TRACT_FIELD_NAME - ] = census_block_group_df[self.GEOID_FIELD_NAME].str[0:11] - - df = census_block_group_df.merge( - census_tract_df, on=self.GEOID_TRACT_FIELD_NAME - ) - # If GEOID10s are read as numbers instead of strings, the initial 0 is dropped, # and then we get too many CBG rows (one for 012345 and one for 12345). - if len(census_block_group_df) > self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS: - raise ValueError( - f"Too many rows in the join: {len(census_block_group_df)}" - ) + + # TODO: Investigate how many rows we should have here + # if len(census_tract_df) > self.EXPECTED_MAX_CENSUS_TRACTS: + # raise ValueError( + # f"Too many rows in the join: {len(census_tract_df)}" + # ) # Calculate median income variables. # First, calculate the income of the block group as a fraction of the state income. + df = census_tract_df df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD] = ( df[field_names.MEDIAN_INCOME_FIELD] / df[field_names.STATE_MEDIAN_INCOME_FIELD] @@ -318,7 +288,7 @@ class ScoreETL(ExtractTransformLoad): ] non_numeric_columns = [ - self.GEOID_FIELD_NAME, + self.GEOID_TRACT_FIELD_NAME, field_names.PERSISTENT_POVERTY_FIELD, ] diff --git a/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py b/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py index 8bb78f60..c8f50172 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py @@ -8,8 +8,8 @@ logger = get_module_logger(__name__) class EJSCREENETL(ExtractTransformLoad): def __init__(self): - self.EJSCREEN_FTP_URL = "https://gaftp.epa.gov/EJSCREEN/2019/EJSCREEN_2019_StatePctile.csv.zip" - self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_2019_StatePctiles.csv" + self.EJSCREEN_FTP_URL = "https://edap-arcgiscloud-data-commons.s3.amazonaws.com/EJSCREEN2020/EJSCREEN_Tract_2020_USPR.csv.zip" + self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_Tract_2020_USPR.csv" self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019" self.df: pd.DataFrame @@ -31,6 +31,14 @@ class EJSCREENETL(ExtractTransformLoad): low_memory=False, ) + # rename ID to Tract ID + self.df.rename( + columns={ + "ID": self.GEOID_TRACT_FIELD_NAME, + }, + inplace=True, + ) + def load(self) -> None: logger.info("Saving EJScreen CSV") # write nationwide csv