mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-24 02:24:20 -08:00
* Add a rough prototype allowing a developer to pre-download data sources for all ETLs * Update code to be more production-ish * Move fetch to Extract part of ETL * Create a downloader to house all downloading operations * Remove unnecessary "name" in data source * Format source files with black * Fix issues from pylint and get the tests working with the new folder structure * Clean up files with black * Fix unzip test * Add caching notes to README * Fix tests (linting and case sensitivity bug) * Address PR comments and add API keys for census where missing * Merging comparator changes from main into this branch for the sake of the PR * Add note on using cache (-u) during pipeline
173 lines
6.8 KiB
Python
173 lines
6.8 KiB
Python
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
from data_pipeline.etl.base import ExtractTransformLoad
|
|
from data_pipeline.score import field_names
|
|
from data_pipeline.utils import get_module_logger
|
|
from data_pipeline.config import settings
|
|
from data_pipeline.etl.datasource import DataSource
|
|
from data_pipeline.etl.datasource import ZIPDataSource
|
|
|
|
logger = get_module_logger(__name__)
|
|
|
|
|
|
class EPARiskScreeningEnvironmentalIndicatorsETL(ExtractTransformLoad):
|
|
"""Class for 2019 Census Tract RSEI Aggregated micro-data
|
|
|
|
Data source overview: Page 20 in this document:
|
|
https://www.epa.gov/sites/default/files/2017-01/documents/rsei-documentation-geographic-microdata-v235.pdf
|
|
|
|
Disaggregated and aggregated datasets for 2019 is documented here:
|
|
https://github.com/usds/justice40-tool/issues/1070#issuecomment-1005604014
|
|
|
|
"""
|
|
|
|
def __init__(self):
|
|
|
|
# fetch
|
|
if settings.DATASOURCE_RETRIEVAL_FROM_AWS:
|
|
self.aggregated_rsei_score_file_url = (
|
|
f"{settings.AWS_JUSTICE40_DATASOURCES_URL}/raw-data-sources/"
|
|
"epa_rsei/CensusMicroTracts2019_2019_aggregated.zip"
|
|
)
|
|
else:
|
|
self.aggregated_rsei_score_file_url = (
|
|
"http://abt-rsei.s3.amazonaws.com/microdata2019/"
|
|
"census_agg/CensusMicroTracts2019_2019_aggregated.zip"
|
|
)
|
|
|
|
# input
|
|
self.aggregated_rsei_score_source = (
|
|
self.get_sources_path()
|
|
/ "CensusMicroTracts2019_2019_aggregated.csv"
|
|
)
|
|
|
|
# output
|
|
self.OUTPUT_PATH: Path = self.DATA_PATH / "dataset" / "epa_rsei"
|
|
self.EPA_RSEI_SCORE_THRESHOLD_CUTOFF = 0.75
|
|
self.TRACT_INPUT_COLUMN_NAME = "GEOID10"
|
|
self.NUMBER_FACILITIES_INPUT_FIELD = "NUMFACS"
|
|
self.NUMBER_RELEASES_INPUT_FIELD = "NUMRELEASES"
|
|
self.NUMBER_CHEMICALS_INPUT_FIELD = "NUMCHEMS"
|
|
self.AVERAGE_TOXICITY_INPUT_FIELD = "TOXCONC"
|
|
self.SCORE_INPUT_FIELD = "SCORE"
|
|
self.POPULATION_INPUT_FIELD = "POP"
|
|
self.CSCORE_INPUT_FIELD = "CSCORE"
|
|
self.NCSCORE_INPUT_FIELD = "NSCORE"
|
|
|
|
# References to the columns that will be output
|
|
self.COLUMNS_TO_KEEP = [
|
|
self.GEOID_TRACT_FIELD_NAME,
|
|
field_names.EPA_RSEI_NUMBER_FACILITIES_FIELD,
|
|
field_names.EPA_RSEI_NUMBER_RELEASES_FIELD,
|
|
field_names.EPA_RSEI_NUMBER_CHEMICALS_FIELD,
|
|
field_names.EPA_RSEI_AVERAGE_TOXICITY_FIELD,
|
|
field_names.EPA_RSEI_SCORE_FIELD,
|
|
field_names.EPA_RSEI_CSCORE_FIELD,
|
|
field_names.EPA_RSEI_NCSCORE_FIELD,
|
|
field_names.EPA_RSEI_POPULATION_FIELD,
|
|
field_names.EPA_RSEI_SCORE_THRESHOLD_FIELD,
|
|
field_names.EPA_RSEI_SCORE_FIELD
|
|
+ field_names.PERCENTILE_FIELD_SUFFIX,
|
|
]
|
|
|
|
self.df: pd.DataFrame
|
|
|
|
def get_data_sources(self) -> [DataSource]:
|
|
return [
|
|
ZIPDataSource(
|
|
source=self.aggregated_rsei_score_file_url,
|
|
destination=self.get_sources_path(),
|
|
)
|
|
]
|
|
|
|
def extract(self, use_cached_data_sources: bool = False) -> None:
|
|
|
|
super().extract(
|
|
use_cached_data_sources
|
|
) # download and extract data sources
|
|
|
|
# the column headers from the above dataset are actually a census tract's data at this point
|
|
# We will use this data structure later to specify the column names
|
|
input_columns = [
|
|
self.TRACT_INPUT_COLUMN_NAME,
|
|
self.NUMBER_FACILITIES_INPUT_FIELD,
|
|
self.NUMBER_RELEASES_INPUT_FIELD,
|
|
self.NUMBER_CHEMICALS_INPUT_FIELD,
|
|
self.AVERAGE_TOXICITY_INPUT_FIELD,
|
|
self.SCORE_INPUT_FIELD,
|
|
self.POPULATION_INPUT_FIELD,
|
|
self.CSCORE_INPUT_FIELD,
|
|
self.NCSCORE_INPUT_FIELD,
|
|
]
|
|
|
|
self.df = pd.read_csv(
|
|
filepath_or_buffer=self.aggregated_rsei_score_source,
|
|
# The following need to remain as strings for all of their digits, not get
|
|
# converted to numbers.
|
|
low_memory=False,
|
|
names=input_columns,
|
|
)
|
|
|
|
def transform(self) -> None:
|
|
score_columns = [x for x in self.df.columns if "SCORE" in x]
|
|
|
|
# coerce dataframe type to perform correct next steps
|
|
self.df[score_columns] = self.df[score_columns].astype(float)
|
|
|
|
self.df.rename(
|
|
columns={
|
|
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
|
|
self.NUMBER_FACILITIES_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_FACILITIES_FIELD,
|
|
self.NUMBER_RELEASES_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_RELEASES_FIELD,
|
|
self.NUMBER_CHEMICALS_INPUT_FIELD: field_names.EPA_RSEI_NUMBER_CHEMICALS_FIELD,
|
|
self.AVERAGE_TOXICITY_INPUT_FIELD: field_names.EPA_RSEI_AVERAGE_TOXICITY_FIELD,
|
|
self.SCORE_INPUT_FIELD: field_names.EPA_RSEI_SCORE_FIELD,
|
|
self.CSCORE_INPUT_FIELD: field_names.EPA_RSEI_CSCORE_FIELD,
|
|
self.NCSCORE_INPUT_FIELD: field_names.EPA_RSEI_NCSCORE_FIELD,
|
|
self.POPULATION_INPUT_FIELD: field_names.EPA_RSEI_POPULATION_FIELD,
|
|
},
|
|
inplace=True,
|
|
)
|
|
|
|
# Please note this: https://www.epa.gov/rsei/understanding-rsei-results#what
|
|
# Section: "What does a high RSEI Score mean?"
|
|
# This was created for the sole purpose to be used in the current
|
|
# iteration of Score L
|
|
self.df[
|
|
field_names.EPA_RSEI_SCORE_FIELD
|
|
+ field_names.PERCENTILE_FIELD_SUFFIX
|
|
] = self.df[field_names.EPA_RSEI_SCORE_FIELD].rank(
|
|
ascending=True,
|
|
pct=True,
|
|
)
|
|
|
|
# This threshold was arbitrarily chosen.
|
|
# It would make sense to enrich this with facilities, industries, or chemical
|
|
# that would enable some additional form of sub-stratification when examining
|
|
# different percentile ranges that are derived above.
|
|
self.df[field_names.EPA_RSEI_SCORE_THRESHOLD_FIELD] = (
|
|
self.df[
|
|
field_names.EPA_RSEI_SCORE_FIELD
|
|
+ field_names.PERCENTILE_FIELD_SUFFIX
|
|
]
|
|
>= self.EPA_RSEI_SCORE_THRESHOLD_CUTOFF
|
|
)
|
|
|
|
expected_census_tract_field_length = 11
|
|
self.df[self.GEOID_TRACT_FIELD_NAME] = (
|
|
self.df[self.GEOID_TRACT_FIELD_NAME]
|
|
.astype(str)
|
|
.apply(lambda x: x.zfill(expected_census_tract_field_length))
|
|
)
|
|
|
|
if len(self.df[self.GEOID_TRACT_FIELD_NAME].str.len().unique()) != 1:
|
|
raise ValueError(
|
|
f"GEOID Tract must be length of {expected_census_tract_field_length}"
|
|
)
|
|
|
|
def load(self) -> None:
|
|
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
|
|
self.df[self.COLUMNS_TO_KEEP].to_csv(
|
|
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
|
|
)
|