mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-22 09:41:26 -08:00
parent
e94d05882c
commit
1c0d87d84b
4 changed files with 68 additions and 12 deletions
|
@ -66,6 +66,11 @@ class ScoreETL(ExtractTransformLoad):
|
|||
# DOE energy burden
|
||||
self.ENERGY_BURDEN_FIELD_NAME = "Energy burden"
|
||||
|
||||
# FEMA Risk Index
|
||||
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME = (
|
||||
"FEMA Risk Index Expected Annual Loss Score"
|
||||
)
|
||||
|
||||
# There's another aggregation level (a second level of "buckets").
|
||||
self.AGGREGATION_POLLUTION: str = "Pollution Burden"
|
||||
self.AGGREGATION_POPULATION: str = "Population Characteristics"
|
||||
|
@ -85,6 +90,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
self.census_acs_median_incomes_df: pd.DataFrame
|
||||
self.cdc_life_expectancy_df: pd.DataFrame
|
||||
self.doe_energy_burden_df: pd.DataFrame
|
||||
self.national_risk_index_df: pd.DataFrame
|
||||
|
||||
def data_sets(self) -> list:
|
||||
# Define a named tuple that will be used for each data set input.
|
||||
|
@ -186,6 +192,11 @@ class ScoreETL(ExtractTransformLoad):
|
|||
renamed_field=self.ENERGY_BURDEN_FIELD_NAME,
|
||||
bucket=None,
|
||||
),
|
||||
DataSet(
|
||||
input_field=self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
|
||||
renamed_field=self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
|
||||
bucket=None,
|
||||
),
|
||||
# The following data sets have buckets, because they're used in Score C
|
||||
DataSet(
|
||||
input_field="CANCER",
|
||||
|
@ -365,6 +376,16 @@ class ScoreETL(ExtractTransformLoad):
|
|||
low_memory=False,
|
||||
)
|
||||
|
||||
# Load FEMA national risk index data
|
||||
national_risk_index_csv = (
|
||||
self.DATA_PATH / "dataset" / "national_risk_index_2020" / "usa.csv"
|
||||
)
|
||||
self.national_risk_index_df = pd.read_csv(
|
||||
national_risk_index_csv,
|
||||
dtype={self.GEOID_FIELD_NAME: "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
|
||||
def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame:
|
||||
logger.info("Joining Census Block Group dataframes")
|
||||
census_block_group_df = functools.reduce(
|
||||
|
@ -630,6 +651,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
self.census_df,
|
||||
self.housing_and_transportation_df,
|
||||
self.census_acs_median_incomes_df,
|
||||
self.national_risk_index_df,
|
||||
]
|
||||
census_block_group_df = self._join_cbg_dfs(census_block_group_dfs)
|
||||
|
||||
|
@ -638,7 +660,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
self.hud_housing_df,
|
||||
self.cdc_places_df,
|
||||
self.cdc_life_expectancy_df,
|
||||
self.doe_energy_burden_df
|
||||
self.doe_energy_burden_df,
|
||||
]
|
||||
census_tract_df = self._join_tract_dfs(census_tract_dfs)
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ class CDCLifeExpectancy(ExtractTransformLoad):
|
|||
self.raw_df = pd.read_csv(
|
||||
filepath_or_buffer=download_file_name,
|
||||
dtype={
|
||||
# The following need to remain as strings for all of their digits, not get converted to numbers.
|
||||
# The following need to remain as strings for all of their digits, not get converted to numbers.
|
||||
self.TRACT_INPUT_COLUMN_NAME: "string",
|
||||
},
|
||||
low_memory=False,
|
||||
|
|
|
@ -62,12 +62,16 @@ class DOEEnergyBurden(ExtractTransformLoad):
|
|||
)
|
||||
|
||||
# Convert energy burden to a fraction, since we represent all other percentages as fractions.
|
||||
output_df[self.ENERGY_BURDEN_FIELD_NAME] = output_df[self.ENERGY_BURDEN_FIELD_NAME] / 100
|
||||
output_df[self.ENERGY_BURDEN_FIELD_NAME] = (
|
||||
output_df[self.ENERGY_BURDEN_FIELD_NAME] / 100
|
||||
)
|
||||
|
||||
# Left-pad the tracts with 0s
|
||||
expected_length_of_census_tract_field = 11
|
||||
output_df[self.GEOID_TRACT_FIELD_NAME] = output_df[self.GEOID_TRACT_FIELD_NAME].astype(str).apply(
|
||||
lambda x: x.zfill(expected_length_of_census_tract_field)
|
||||
output_df[self.GEOID_TRACT_FIELD_NAME] = (
|
||||
output_df[self.GEOID_TRACT_FIELD_NAME]
|
||||
.astype(str)
|
||||
.apply(lambda x: x.zfill(expected_length_of_census_tract_field))
|
||||
)
|
||||
|
||||
self.output_df = output_df
|
||||
|
|
|
@ -18,6 +18,19 @@ class NationalRiskIndexETL(ExtractTransformLoad):
|
|||
self.BLOCK_GROUP_CSV = (
|
||||
self.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
|
||||
)
|
||||
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = (
|
||||
"EAL_SCORE"
|
||||
)
|
||||
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME = (
|
||||
"FEMA Risk Index Expected Annual Loss Score"
|
||||
)
|
||||
|
||||
# Note: also need to edit transform step to add fields to output.
|
||||
self.COLUMNS_TO_KEEP = [
|
||||
self.GEOID_FIELD_NAME,
|
||||
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
|
||||
]
|
||||
|
||||
self.df: pd.DataFrame
|
||||
|
||||
def extract(self) -> None:
|
||||
|
@ -42,23 +55,37 @@ class NationalRiskIndexETL(ExtractTransformLoad):
|
|||
|
||||
NRI_TRACT_COL = "TRACTFIPS" # Census Tract Column in NRI data
|
||||
TRACT_COL = self.GEOID_TRACT_FIELD_NAME # Census Tract column name
|
||||
BLOCK_COL = self.GEOID_FIELD_NAME # Census Block Group column name
|
||||
|
||||
# read in the unzipped csv from NRI data source then rename the
|
||||
# Census Tract column for merging
|
||||
df_nri = pd.read_csv(
|
||||
df_nri: pd.DataFrame = pd.read_csv(
|
||||
self.INPUT_CSV,
|
||||
dtype={NRI_TRACT_COL: "string"},
|
||||
na_values=["None"],
|
||||
low_memory=False,
|
||||
)
|
||||
df_nri.rename(columns={NRI_TRACT_COL: TRACT_COL}, inplace=True)
|
||||
df_nri.rename(
|
||||
columns={
|
||||
NRI_TRACT_COL: TRACT_COL,
|
||||
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME: self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
|
||||
},
|
||||
inplace=True,
|
||||
)
|
||||
|
||||
# Reduce columns.
|
||||
# Note: normally we wait until writing to CSV for this step, but since the file is so huge,
|
||||
# move this up here for performance reasons.
|
||||
df_nri = df_nri[ # pylint: disable=unsubscriptable-object
|
||||
[self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, TRACT_COL]
|
||||
]
|
||||
|
||||
# get the full list of Census Block Groups from the ACS data
|
||||
# and extract the Census Tract ID from each Block Group ID
|
||||
df_acs = pd.read_csv(self.BLOCK_GROUP_CSV, dtype={BLOCK_COL: "string"})
|
||||
df_acs[TRACT_COL] = df_acs[BLOCK_COL].str[0:11]
|
||||
df_block_group = df_acs[[BLOCK_COL, TRACT_COL]]
|
||||
df_acs = pd.read_csv(
|
||||
self.BLOCK_GROUP_CSV, dtype={self.GEOID_FIELD_NAME: "string"}
|
||||
)
|
||||
df_acs[TRACT_COL] = df_acs[self.GEOID_FIELD_NAME].str[0:11]
|
||||
df_block_group = df_acs[[self.GEOID_FIELD_NAME, TRACT_COL]]
|
||||
|
||||
# merge NRI data on the Census Tract ID so that each
|
||||
# Block Group inherits the NRI score of its Census Tract
|
||||
|
@ -67,6 +94,9 @@ class NationalRiskIndexETL(ExtractTransformLoad):
|
|||
def load(self) -> None:
|
||||
"""Writes the NRI data as a csv to the directory at self.OUTPUT_DIR"""
|
||||
logger.info("Saving National Risk Index CSV")
|
||||
|
||||
# write nationwide csv
|
||||
self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
self.df.to_csv(self.OUTPUT_DIR / "usa.csv", index=False)
|
||||
self.df[self.COLUMNS_TO_KEEP].to_csv(
|
||||
self.OUTPUT_DIR / "usa.csv", index=False
|
||||
)
|
||||
|
|
Loading…
Add table
Reference in a new issue