j40-cejst-2/data/data-pipeline/data_pipeline/etl/sources/epa_rsei/etl.py
Travis Newby 6f39033dde
Add ability to cache ETL data sources (#2169)
* Add a rough prototype allowing a developer to pre-download data sources for all ETLs

* Update code to be more production-ish

* Move fetch to Extract part of ETL
* Create a downloader to house all downloading operations
* Remove unnecessary "name" in data source

* Format source files with black

* Fix issues from pylint and get the tests working with the new folder structure

* Clean up files with black

* Fix unzip test

* Add caching notes to README

* Fix tests (linting and case sensitivity bug)

* Address PR comments and add API keys for census where missing

* Merging comparator changes from main into this branch for the sake of the PR

* Add note on using cache (-u) during pipeline
2023-03-03 12:26:24 -06:00

173 lines
6.8 KiB
Python

from pathlib import Path
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.score import field_names
from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings
from data_pipeline.etl.datasource import DataSource
from data_pipeline.etl.datasource import ZIPDataSource
logger = get_module_logger(__name__)
class EPARiskScreeningEnvironmentalIndicatorsETL(ExtractTransformLoad):
"""Class for 2019 Census Tract RSEI Aggregated micro-data
Data source overview: Page 20 in this document:
https://www.epa.gov/sites/default/files/2017-01/documents/rsei-documentation-geographic-microdata-v235.pdf
Disaggregated and aggregated datasets for 2019 is documented here:
https://github.com/usds/justice40-tool/issues/1070#issuecomment-1005604014
"""
def __init__(self):
# fetch
if settings.DATASOURCE_RETRIEVAL_FROM_AWS:
self.aggregated_rsei_score_file_url = (
f"{settings.AWS_JUSTICE40_DATASOURCES_URL}/raw-data-sources/"
"epa_rsei/CensusMicroTracts2019_2019_aggregated.zip"
)
else:
self.aggregated_rsei_score_file_url = (
"http://abt-rsei.s3.amazonaws.com/microdata2019/"
"census_agg/CensusMicroTracts2019_2019_aggregated.zip"
)
# input
self.aggregated_rsei_score_source = (
self.get_sources_path()
/ "CensusMicroTracts2019_2019_aggregated.csv"
)
# output
self.OUTPUT_PATH: Path = self.DATA_PATH / "dataset" / "epa_rsei"
self.EPA_RSEI_SCORE_THRESHOLD_CUTOFF = 0.75
self.TRACT_INPUT_COLUMN_NAME = "GEOID10"
self.NUMBER_FACILITIES_INPUT_FIELD = "NUMFACS"
self.NUMBER_RELEASES_INPUT_FIELD = "NUMRELEASES"
self.NUMBER_CHEMICALS_INPUT_FIELD = "NUMCHEMS"
self.AVERAGE_TOXICITY_INPUT_FIELD = "TOXCONC"
self.SCORE_INPUT_FIELD = "SCORE"
self.POPULATION_INPUT_FIELD = "POP"
self.CSCORE_INPUT_FIELD = "CSCORE"
self.NCSCORE_INPUT_FIELD = "NSCORE"
# References to the columns that will be output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
field_names.EPA_RSEI_NUMBER_FACILITIES_FIELD,
field_names.EPA_RSEI_NUMBER_RELEASES_FIELD,
field_names.EPA_RSEI_NUMBER_CHEMICALS_FIELD,
field_names.EPA_RSEI_AVERAGE_TOXICITY_FIELD,
field_names.EPA_RSEI_SCORE_FIELD,
field_names.EPA_RSEI_CSCORE_FIELD,
field_names.EPA_RSEI_NCSCORE_FIELD,
field_names.EPA_RSEI_POPULATION_FIELD,
field_names.EPA_RSEI_SCORE_THRESHOLD_FIELD,
field_names.EPA_RSEI_SCORE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
]
self.df: pd.DataFrame
def get_data_sources(self) -> [DataSource]:
return [
ZIPDataSource(
source=self.aggregated_rsei_score_file_url,
destination=self.get_sources_path(),
)
]
def extract(self, use_cached_data_sources: bool = False) -> None:
super().extract(
use_cached_data_sources
) # download and extract data sources
# the column headers from the above dataset are actually a census tract's data at this point
# We will use this data structure later to specify the column names
input_columns = [
self.TRACT_INPUT_COLUMN_NAME,
self.NUMBER_FACILITIES_INPUT_FIELD,
self.NUMBER_RELEASES_INPUT_FIELD,
self.NUMBER_CHEMICALS_INPUT_FIELD,
self.AVERAGE_TOXICITY_INPUT_FIELD,
self.SCORE_INPUT_FIELD,
self.POPULATION_INPUT_FIELD,
self.CSCORE_INPUT_FIELD,
self.NCSCORE_INPUT_FIELD,
]
self.df = pd.read_csv(
filepath_or_buffer=self.aggregated_rsei_score_source,
# The following need to remain as strings for all of their digits, not get
# converted to numbers.
low_memory=False,
names=input_columns,
)
def transform(self) -> None:
score_columns = [x for x in self.df.columns if "SCORE" in x]
# coerce dataframe type to perform correct next steps
self.df[score_columns] = self.df[score_columns].astype(float)
self.df.rename(
columns={
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
self.NUMBER_FACILITIES_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_FACILITIES_FIELD,
self.NUMBER_RELEASES_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_RELEASES_FIELD,
self.NUMBER_CHEMICALS_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_CHEMICALS_FIELD,
self.AVERAGE_TOXICITY_INPUT_FIELD: field_names.EPA_RSEI_AVERAGE_TOXICITY_FIELD,
self.SCORE_INPUT_FIELD: field_names.EPA_RSEI_SCORE_FIELD,
self.CSCORE_INPUT_FIELD: field_names.EPA_RSEI_CSCORE_FIELD,
self.NCSCORE_INPUT_FIELD: field_names.EPA_RSEI_NCSCORE_FIELD,
self.POPULATION_INPUT_FIELD: field_names.EPA_RSEI_POPULATION_FIELD,
},
inplace=True,
)
# Please note this: https://www.epa.gov/rsei/understanding-rsei-results#what
# Section: "What does a high RSEI Score mean?"
# This was created for the sole purpose to be used in the current
# iteration of Score L
self.df[
field_names.EPA_RSEI_SCORE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX
] = self.df[field_names.EPA_RSEI_SCORE_FIELD].rank(
ascending=True,
pct=True,
)
# This threshold was arbitrarily chosen.
# It would make sense to enrich this with facilities, industries, or chemical
# that would enable some additional form of sub-stratification when examining
# different percentile ranges that are derived above.
self.df[field_names.EPA_RSEI_SCORE_THRESHOLD_FIELD] = (
self.df[
field_names.EPA_RSEI_SCORE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX
]
>= self.EPA_RSEI_SCORE_THRESHOLD_CUTOFF
)
expected_census_tract_field_length = 11
self.df[self.GEOID_TRACT_FIELD_NAME] = (
self.df[self.GEOID_TRACT_FIELD_NAME]
.astype(str)
.apply(lambda x: x.zfill(expected_census_tract_field_length))
)
if len(self.df[self.GEOID_TRACT_FIELD_NAME].str.len().unique()) != 1:
raise ValueError(
f"GEOID Tract must be length of {expected_census_tract_field_length}"
)
def load(self) -> None:
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)