j40-cejst-2/data/data-pipeline/data_pipeline/etl/sources/epa_rsei/etl.py
Lucas Merrill Brown a0d6e55f0a
Run ETL processes in parallel (#1253)
* WIP on parallelizing

* switching to get_tmp_path for nri

* switching to get_tmp_path everywhere necessary

* fixing linter errors

* moving heavy ETLs to front of line

* add hold

* moving cdc places up

* removing unnecessary print

* moving h&t up

* adding parallel to geo post

* better census labels

* switching to concurrent futures

* fixing output
2022-02-11 14:04:53 -05:00

153 lines
6.2 KiB
Python

from pathlib import Path
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.score import field_names
from data_pipeline.utils import get_module_logger, unzip_file_from_url
logger = get_module_logger(__name__)
class EPARiskScreeningEnvironmentalIndicatorsETL(ExtractTransformLoad):
"""Class for 2019 Census Tract RSEI Aggregated micro-data
Data source overview: Page 20 in this document:
https://www.epa.gov/sites/default/files/2017-01/documents/rsei-documentation-geographic-microdata-v235.pdf
Disaggregated and aggregated datasets for 2019 is documented here:
https://github.com/usds/justice40-tool/issues/1070#issuecomment-1005604014
"""
def __init__(self):
self.AGGREGATED_RSEI_SCORE_FILE_URL = "http://abt-rsei.s3.amazonaws.com/microdata2019/census_agg/CensusMicroTracts2019_2019_aggregated.zip"
self.OUTPUT_PATH: Path = self.DATA_PATH / "dataset" / "epa_rsei"
self.EPA_RSEI_SCORE_THRESHOLD_CUTOFF = 0.75
self.TRACT_INPUT_COLUMN_NAME = "GEOID10"
self.NUMBER_FACILITIES_INPUT_FIELD = "NUMFACS"
self.NUMBER_RELEASES_INPUT_FIELD = "NUMRELEASES"
self.NUMBER_CHEMICALS_INPUT_FIELD = "NUMCHEMS"
self.AVERAGE_TOXICITY_INPUT_FIELD = "TOXCONC"
self.SCORE_INPUT_FIELD = "SCORE"
self.POPULATION_INPUT_FIELD = "POP"
self.CSCORE_INPUT_FIELD = "CSCORE"
self.NCSCORE_INPUT_FIELD = "NSCORE"
# References to the columns that will be output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
field_names.EPA_RSEI_NUMBER_FACILITIES_FIELD,
field_names.EPA_RSEI_NUMBER_RELEASES_FIELD,
field_names.EPA_RSEI_NUMBER_CHEMICALS_FIELD,
field_names.EPA_RSEI_AVERAGE_TOXICITY_FIELD,
field_names.EPA_RSEI_SCORE_FIELD,
field_names.EPA_RSEI_CSCORE_FIELD,
field_names.EPA_RSEI_NCSCORE_FIELD,
field_names.EPA_RSEI_POPULATION_FIELD,
field_names.EPA_RSEI_SCORE_THRESHOLD_FIELD,
field_names.EPA_RSEI_SCORE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
]
self.df: pd.DataFrame
def extract(self) -> None:
logger.info("Starting 2.5 MB data download.")
# the column headers from the above dataset are actually a census tract's data at this point
# We will use this data structure later to specify the column names
input_columns = [
self.TRACT_INPUT_COLUMN_NAME,
self.NUMBER_FACILITIES_INPUT_FIELD,
self.NUMBER_RELEASES_INPUT_FIELD,
self.NUMBER_CHEMICALS_INPUT_FIELD,
self.AVERAGE_TOXICITY_INPUT_FIELD,
self.SCORE_INPUT_FIELD,
self.POPULATION_INPUT_FIELD,
self.CSCORE_INPUT_FIELD,
self.NCSCORE_INPUT_FIELD,
]
unzip_file_from_url(
file_url=self.AGGREGATED_RSEI_SCORE_FILE_URL,
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "epa_rsei",
)
self.df = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "epa_rsei"
/ "CensusMicroTracts2019_2019_aggregated.csv",
# The following need to remain as strings for all of their digits, not get
# converted to numbers.
low_memory=False,
names=input_columns,
)
def transform(self) -> None:
logger.info("Starting transforms.")
score_columns = [x for x in self.df.columns if "SCORE" in x]
# coerce dataframe type to perform correct next steps
self.df[score_columns] = self.df[score_columns].astype(float)
self.df.rename(
columns={
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
self.NUMBER_FACILITIES_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_FACILITIES_FIELD,
self.NUMBER_RELEASES_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_RELEASES_FIELD,
self.NUMBER_CHEMICALS_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_CHEMICALS_FIELD,
self.AVERAGE_TOXICITY_INPUT_FIELD: field_names.EPA_RSEI_AVERAGE_TOXICITY_FIELD,
self.SCORE_INPUT_FIELD: field_names.EPA_RSEI_SCORE_FIELD,
self.CSCORE_INPUT_FIELD: field_names.EPA_RSEI_CSCORE_FIELD,
self.NCSCORE_INPUT_FIELD: field_names.EPA_RSEI_NCSCORE_FIELD,
self.POPULATION_INPUT_FIELD: field_names.EPA_RSEI_POPULATION_FIELD,
},
inplace=True,
)
# Please note this: https://www.epa.gov/rsei/understanding-rsei-results#what
# Section: "What does a high RSEI Score mean?"
# This was created for the sole purpose to be used in the current
# iteration of Score L
self.df[
field_names.EPA_RSEI_SCORE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX
] = self.df[field_names.EPA_RSEI_SCORE_FIELD].rank(
ascending=True,
pct=True,
)
# This threshold was arbitrarily chosen.
# It would make sense to enrich this with facilities, industries, or chemical
# that would enable some additional form of sub-stratification when examining
# different percentile ranges that are derived above.
self.df[field_names.EPA_RSEI_SCORE_THRESHOLD_FIELD] = (
self.df[
field_names.EPA_RSEI_SCORE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX
]
>= self.EPA_RSEI_SCORE_THRESHOLD_CUTOFF
)
expected_census_tract_field_length = 11
self.df[self.GEOID_TRACT_FIELD_NAME] = (
self.df[self.GEOID_TRACT_FIELD_NAME]
.astype(str)
.apply(lambda x: x.zfill(expected_census_tract_field_length))
)
if len(self.df[self.GEOID_TRACT_FIELD_NAME].str.len().unique()) != 1:
raise ValueError(
f"GEOID Tract must be length of {expected_census_tract_field_length}"
)
def load(self) -> None:
logger.info("Saving CSV")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)