From 1c0d87d84b628a6b38f58d662ceb29554b16fd87 Mon Sep 17 00:00:00 2001 From: Lucas Merrill Brown Date: Wed, 15 Sep 2021 13:31:32 -0500 Subject: [PATCH] Add FEMA risk index to score file (#687) * Add to score file --- .../data_pipeline/etl/score/etl_score.py | 24 +++++++++- .../etl/sources/cdc_life_expectancy/etl.py | 2 +- .../etl/sources/doe_energy_burden/etl.py | 10 +++-- .../etl/sources/national_risk_index/etl.py | 44 ++++++++++++++++--- 4 files changed, 68 insertions(+), 12 deletions(-) diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index baec9f6d..80b20fc3 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -66,6 +66,11 @@ class ScoreETL(ExtractTransformLoad): # DOE energy burden self.ENERGY_BURDEN_FIELD_NAME = "Energy burden" + # FEMA Risk Index + self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME = ( + "FEMA Risk Index Expected Annual Loss Score" + ) + # There's another aggregation level (a second level of "buckets"). self.AGGREGATION_POLLUTION: str = "Pollution Burden" self.AGGREGATION_POPULATION: str = "Population Characteristics" @@ -85,6 +90,7 @@ class ScoreETL(ExtractTransformLoad): self.census_acs_median_incomes_df: pd.DataFrame self.cdc_life_expectancy_df: pd.DataFrame self.doe_energy_burden_df: pd.DataFrame + self.national_risk_index_df: pd.DataFrame def data_sets(self) -> list: # Define a named tuple that will be used for each data set input. @@ -186,6 +192,11 @@ class ScoreETL(ExtractTransformLoad): renamed_field=self.ENERGY_BURDEN_FIELD_NAME, bucket=None, ), + DataSet( + input_field=self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, + renamed_field=self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, + bucket=None, + ), # The following data sets have buckets, because they're used in Score C DataSet( input_field="CANCER", @@ -365,6 +376,16 @@ class ScoreETL(ExtractTransformLoad): low_memory=False, ) + # Load FEMA national risk index data + national_risk_index_csv = ( + self.DATA_PATH / "dataset" / "national_risk_index_2020" / "usa.csv" + ) + self.national_risk_index_df = pd.read_csv( + national_risk_index_csv, + dtype={self.GEOID_FIELD_NAME: "string"}, + low_memory=False, + ) + def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame: logger.info("Joining Census Block Group dataframes") census_block_group_df = functools.reduce( @@ -630,6 +651,7 @@ class ScoreETL(ExtractTransformLoad): self.census_df, self.housing_and_transportation_df, self.census_acs_median_incomes_df, + self.national_risk_index_df, ] census_block_group_df = self._join_cbg_dfs(census_block_group_dfs) @@ -638,7 +660,7 @@ class ScoreETL(ExtractTransformLoad): self.hud_housing_df, self.cdc_places_df, self.cdc_life_expectancy_df, - self.doe_energy_burden_df + self.doe_energy_burden_df, ] census_tract_df = self._join_tract_dfs(census_tract_dfs) diff --git a/data/data-pipeline/data_pipeline/etl/sources/cdc_life_expectancy/etl.py b/data/data-pipeline/data_pipeline/etl/sources/cdc_life_expectancy/etl.py index 71dad1a8..79f3ae91 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/cdc_life_expectancy/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/cdc_life_expectancy/etl.py @@ -39,7 +39,7 @@ class CDCLifeExpectancy(ExtractTransformLoad): self.raw_df = pd.read_csv( filepath_or_buffer=download_file_name, dtype={ - # The following need to remain as strings for all of their digits, not get converted to numbers. + # The following need to remain as strings for all of their digits, not get converted to numbers. self.TRACT_INPUT_COLUMN_NAME: "string", }, low_memory=False, diff --git a/data/data-pipeline/data_pipeline/etl/sources/doe_energy_burden/etl.py b/data/data-pipeline/data_pipeline/etl/sources/doe_energy_burden/etl.py index 45df6335..e764e997 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/doe_energy_burden/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/doe_energy_burden/etl.py @@ -62,12 +62,16 @@ class DOEEnergyBurden(ExtractTransformLoad): ) # Convert energy burden to a fraction, since we represent all other percentages as fractions. - output_df[self.ENERGY_BURDEN_FIELD_NAME] = output_df[self.ENERGY_BURDEN_FIELD_NAME] / 100 + output_df[self.ENERGY_BURDEN_FIELD_NAME] = ( + output_df[self.ENERGY_BURDEN_FIELD_NAME] / 100 + ) # Left-pad the tracts with 0s expected_length_of_census_tract_field = 11 - output_df[self.GEOID_TRACT_FIELD_NAME] = output_df[self.GEOID_TRACT_FIELD_NAME].astype(str).apply( - lambda x: x.zfill(expected_length_of_census_tract_field) + output_df[self.GEOID_TRACT_FIELD_NAME] = ( + output_df[self.GEOID_TRACT_FIELD_NAME] + .astype(str) + .apply(lambda x: x.zfill(expected_length_of_census_tract_field)) ) self.output_df = output_df diff --git a/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py b/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py index 68049947..f0357702 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py @@ -18,6 +18,19 @@ class NationalRiskIndexETL(ExtractTransformLoad): self.BLOCK_GROUP_CSV = ( self.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv" ) + self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = ( + "EAL_SCORE" + ) + self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME = ( + "FEMA Risk Index Expected Annual Loss Score" + ) + + # Note: also need to edit transform step to add fields to output. + self.COLUMNS_TO_KEEP = [ + self.GEOID_FIELD_NAME, + self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, + ] + self.df: pd.DataFrame def extract(self) -> None: @@ -42,23 +55,37 @@ class NationalRiskIndexETL(ExtractTransformLoad): NRI_TRACT_COL = "TRACTFIPS" # Census Tract Column in NRI data TRACT_COL = self.GEOID_TRACT_FIELD_NAME # Census Tract column name - BLOCK_COL = self.GEOID_FIELD_NAME # Census Block Group column name # read in the unzipped csv from NRI data source then rename the # Census Tract column for merging - df_nri = pd.read_csv( + df_nri: pd.DataFrame = pd.read_csv( self.INPUT_CSV, dtype={NRI_TRACT_COL: "string"}, na_values=["None"], low_memory=False, ) - df_nri.rename(columns={NRI_TRACT_COL: TRACT_COL}, inplace=True) + df_nri.rename( + columns={ + NRI_TRACT_COL: TRACT_COL, + self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME: self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, + }, + inplace=True, + ) + + # Reduce columns. + # Note: normally we wait until writing to CSV for this step, but since the file is so huge, + # move this up here for performance reasons. + df_nri = df_nri[ # pylint: disable=unsubscriptable-object + [self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, TRACT_COL] + ] # get the full list of Census Block Groups from the ACS data # and extract the Census Tract ID from each Block Group ID - df_acs = pd.read_csv(self.BLOCK_GROUP_CSV, dtype={BLOCK_COL: "string"}) - df_acs[TRACT_COL] = df_acs[BLOCK_COL].str[0:11] - df_block_group = df_acs[[BLOCK_COL, TRACT_COL]] + df_acs = pd.read_csv( + self.BLOCK_GROUP_CSV, dtype={self.GEOID_FIELD_NAME: "string"} + ) + df_acs[TRACT_COL] = df_acs[self.GEOID_FIELD_NAME].str[0:11] + df_block_group = df_acs[[self.GEOID_FIELD_NAME, TRACT_COL]] # merge NRI data on the Census Tract ID so that each # Block Group inherits the NRI score of its Census Tract @@ -67,6 +94,9 @@ class NationalRiskIndexETL(ExtractTransformLoad): def load(self) -> None: """Writes the NRI data as a csv to the directory at self.OUTPUT_DIR""" logger.info("Saving National Risk Index CSV") + # write nationwide csv self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - self.df.to_csv(self.OUTPUT_DIR / "usa.csv", index=False) + self.df[self.COLUMNS_TO_KEEP].to_csv( + self.OUTPUT_DIR / "usa.csv", index=False + )