mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-07-25 08:20:16 -07:00
parent
f915e20e91
commit
16eb29e429
2 changed files with 26 additions and 48 deletions
|
@ -37,12 +37,13 @@ class ScoreETL(ExtractTransformLoad):
|
|||
constants.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv"
|
||||
)
|
||||
self.ejscreen_df = pd.read_csv(
|
||||
ejscreen_csv, dtype={"ID": "string"}, low_memory=False
|
||||
ejscreen_csv,
|
||||
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
# TODO move to EJScreen ETL
|
||||
self.ejscreen_df.rename(
|
||||
columns={
|
||||
"ID": self.GEOID_FIELD_NAME,
|
||||
"ACSTOTPOP": field_names.TOTAL_POP_FIELD,
|
||||
"CANCER": field_names.AIR_TOXICS_CANCER_RISK_FIELD,
|
||||
"RESP": field_names.RESPITORY_HAZARD_FIELD,
|
||||
|
@ -70,7 +71,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
)
|
||||
self.census_df = pd.read_csv(
|
||||
census_csv,
|
||||
dtype={self.GEOID_FIELD_NAME: "string"},
|
||||
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
|
||||
|
@ -83,7 +84,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
)
|
||||
self.housing_and_transportation_df = pd.read_csv(
|
||||
housing_and_transportation_index_csv,
|
||||
dtype={self.GEOID_FIELD_NAME: "string"},
|
||||
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
# TODO move to HT Index ETL
|
||||
|
@ -120,7 +121,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
)
|
||||
self.census_acs_median_incomes_df = pd.read_csv(
|
||||
census_acs_median_incomes_csv,
|
||||
dtype={self.GEOID_FIELD_NAME: "string"},
|
||||
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
|
||||
|
@ -153,7 +154,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
)
|
||||
self.national_risk_index_df = pd.read_csv(
|
||||
national_risk_index_csv,
|
||||
dtype={self.GEOID_FIELD_NAME: "string"},
|
||||
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
|
||||
|
@ -177,25 +178,6 @@ class ScoreETL(ExtractTransformLoad):
|
|||
low_memory=False,
|
||||
)
|
||||
|
||||
def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame:
|
||||
logger.info("Joining Census Block Group dataframes")
|
||||
census_block_group_df = functools.reduce(
|
||||
lambda left, right: pd.merge(
|
||||
left=left, right=right, on=self.GEOID_FIELD_NAME, how="outer"
|
||||
),
|
||||
census_block_group_dfs,
|
||||
)
|
||||
|
||||
# Sanity check the join.
|
||||
if (
|
||||
len(census_block_group_df[self.GEOID_FIELD_NAME].str.len().unique())
|
||||
!= 1
|
||||
):
|
||||
raise ValueError(
|
||||
f"One of the input CSVs uses {self.GEOID_FIELD_NAME} with a different length."
|
||||
)
|
||||
return census_block_group_df
|
||||
|
||||
def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame:
|
||||
logger.info("Joining Census Tract dataframes")
|
||||
census_tract_df = functools.reduce(
|
||||
|
@ -222,13 +204,6 @@ class ScoreETL(ExtractTransformLoad):
|
|||
def _prepare_initial_df(self) -> pd.DataFrame:
|
||||
logger.info("Preparing initial dataframe")
|
||||
|
||||
# Join all the data sources that use census block groups
|
||||
census_block_group_dfs = [
|
||||
self.ejscreen_df,
|
||||
]
|
||||
|
||||
census_block_group_df = self._join_cbg_dfs(census_block_group_dfs)
|
||||
|
||||
# Join all the data sources that use census tracts
|
||||
census_tract_dfs = [
|
||||
self.census_df,
|
||||
|
@ -236,6 +211,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
self.cdc_places_df,
|
||||
self.cdc_life_expectancy_df,
|
||||
self.doe_energy_burden_df,
|
||||
self.ejscreen_df,
|
||||
self.geocorr_urban_rural_df,
|
||||
self.persistent_poverty_df,
|
||||
self.housing_and_transportation_df,
|
||||
|
@ -244,24 +220,18 @@ class ScoreETL(ExtractTransformLoad):
|
|||
]
|
||||
census_tract_df = self._join_tract_dfs(census_tract_dfs)
|
||||
|
||||
# Calculate the tract for the CBG data.
|
||||
census_block_group_df[
|
||||
self.GEOID_TRACT_FIELD_NAME
|
||||
] = census_block_group_df[self.GEOID_FIELD_NAME].str[0:11]
|
||||
|
||||
df = census_block_group_df.merge(
|
||||
census_tract_df, on=self.GEOID_TRACT_FIELD_NAME
|
||||
)
|
||||
|
||||
# If GEOID10s are read as numbers instead of strings, the initial 0 is dropped,
|
||||
# and then we get too many CBG rows (one for 012345 and one for 12345).
|
||||
if len(census_block_group_df) > self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS:
|
||||
raise ValueError(
|
||||
f"Too many rows in the join: {len(census_block_group_df)}"
|
||||
)
|
||||
|
||||
# TODO: Investigate how many rows we should have here
|
||||
# if len(census_tract_df) > self.EXPECTED_MAX_CENSUS_TRACTS:
|
||||
# raise ValueError(
|
||||
# f"Too many rows in the join: {len(census_tract_df)}"
|
||||
# )
|
||||
|
||||
# Calculate median income variables.
|
||||
# First, calculate the income of the block group as a fraction of the state income.
|
||||
df = census_tract_df
|
||||
df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD] = (
|
||||
df[field_names.MEDIAN_INCOME_FIELD]
|
||||
/ df[field_names.STATE_MEDIAN_INCOME_FIELD]
|
||||
|
@ -318,7 +288,7 @@ class ScoreETL(ExtractTransformLoad):
|
|||
]
|
||||
|
||||
non_numeric_columns = [
|
||||
self.GEOID_FIELD_NAME,
|
||||
self.GEOID_TRACT_FIELD_NAME,
|
||||
field_names.PERSISTENT_POVERTY_FIELD,
|
||||
]
|
||||
|
||||
|
|
|
@ -8,8 +8,8 @@ logger = get_module_logger(__name__)
|
|||
|
||||
class EJSCREENETL(ExtractTransformLoad):
|
||||
def __init__(self):
|
||||
self.EJSCREEN_FTP_URL = "https://gaftp.epa.gov/EJSCREEN/2019/EJSCREEN_2019_StatePctile.csv.zip"
|
||||
self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_2019_StatePctiles.csv"
|
||||
self.EJSCREEN_FTP_URL = "https://edap-arcgiscloud-data-commons.s3.amazonaws.com/EJSCREEN2020/EJSCREEN_Tract_2020_USPR.csv.zip"
|
||||
self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_Tract_2020_USPR.csv"
|
||||
self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019"
|
||||
self.df: pd.DataFrame
|
||||
|
||||
|
@ -31,6 +31,14 @@ class EJSCREENETL(ExtractTransformLoad):
|
|||
low_memory=False,
|
||||
)
|
||||
|
||||
# rename ID to Tract ID
|
||||
self.df.rename(
|
||||
columns={
|
||||
"ID": self.GEOID_TRACT_FIELD_NAME,
|
||||
},
|
||||
inplace=True,
|
||||
)
|
||||
|
||||
def load(self) -> None:
|
||||
logger.info("Saving EJScreen CSV")
|
||||
# write nationwide csv
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue