Add ability to cache ETL data sources (#2169)

* Add a rough prototype allowing a developer to pre-download data sources for all ETLs

* Update code to be more production-ish

* Move fetch to Extract part of ETL
* Create a downloader to house all downloading operations
* Remove unnecessary "name" in data source

* Format source files with black

* Fix issues from pylint and get the tests working with the new folder structure

* Clean up files with black

* Fix unzip test

* Add caching notes to README

* Fix tests (linting and case sensitivity bug)

* Address PR comments and add API keys for census where missing

* Merging comparator changes from main into this branch for the sake of the PR

* Add note on using cache (-u) during pipeline
This commit is contained in:
Travis Newby 2023-03-03 12:26:24 -06:00 committed by GitHub
commit 6f39033dde
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
52 changed files with 1787 additions and 686 deletions

View file

@ -4,6 +4,8 @@
# pylint: disable=unsupported-assignment-operation
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.datasource import DataSource
from data_pipeline.etl.datasource import ZIPDataSource
from data_pipeline.etl.base import ValidGeoLevel
from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings
@ -16,17 +18,6 @@ class NationalRiskIndexETL(ExtractTransformLoad):
NAME = "national_risk_index"
if settings.DATASOURCE_RETRIEVAL_FROM_AWS:
SOURCE_URL = (
f"{settings.AWS_JUSTICE40_DATASOURCES_URL}/raw-data-sources/"
"national_risk_index/NRI_Table_CensusTracts.zip"
)
else:
SOURCE_URL = (
"https://hazards.fema.gov/nri/Content/StaticDocuments/DataDownload/"
"NRI_Table_CensusTracts/NRI_Table_CensusTracts.zip"
)
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
LOAD_YAML_CONFIG: bool = True
@ -46,11 +37,28 @@ class NationalRiskIndexETL(ExtractTransformLoad):
AGRIVALUE_LOWER_BOUND = 408000
def __init__(self):
# define the full path for the input CSV file
self.INPUT_CSV = self.get_tmp_path() / "NRI_Table_CensusTracts.csv"
# fetch
if settings.DATASOURCE_RETRIEVAL_FROM_AWS:
self.risk_index_url = (
f"{settings.AWS_JUSTICE40_DATASOURCES_URL}/raw-data-sources/"
"national_risk_index/NRI_Table_CensusTracts.zip"
)
else:
self.risk_index_url = (
"https://hazards.fema.gov/nri/Content/StaticDocuments/DataDownload/"
"NRI_Table_CensusTracts/NRI_Table_CensusTracts.zip"
)
# source
self.risk_index_source = (
self.get_sources_path() / "NRI_Table_CensusTracts.csv"
)
# output
# this is the main dataframe
self.df: pd.DataFrame
self.df_nri: pd.DataFrame
# Start dataset-specific vars here
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = (
@ -65,14 +73,26 @@ class NationalRiskIndexETL(ExtractTransformLoad):
self.POPULATION_INPUT_FIELD_NAME = "POPULATION"
self.BUILDING_VALUE_INPUT_FIELD_NAME = "BUILDVALUE"
def extract(self) -> None:
"""Unzips NRI dataset from the FEMA data source and writes the files
to the temporary data folder for use in the transform() method
"""
def get_data_sources(self) -> [DataSource]:
return [
ZIPDataSource(
source=self.risk_index_url, destination=self.get_sources_path()
)
]
def extract(self, use_cached_data_sources: bool = False) -> None:
super().extract(
source_url=self.SOURCE_URL,
extract_path=self.get_tmp_path(),
use_cached_data_sources
) # download and extract data sources
# read in the unzipped csv from NRI data source then rename the
# Census Tract column for merging
self.df_nri = pd.read_csv(
self.risk_index_source,
dtype={self.INPUT_GEOID_TRACT_FIELD_NAME: "string"},
na_values=["None"],
low_memory=False,
)
def transform(self) -> None:
@ -84,16 +104,7 @@ class NationalRiskIndexETL(ExtractTransformLoad):
Groups inside of that Tract
"""
# read in the unzipped csv from NRI data source then rename the
# Census Tract column for merging
df_nri: pd.DataFrame = pd.read_csv(
self.INPUT_CSV,
dtype={self.INPUT_GEOID_TRACT_FIELD_NAME: "string"},
na_values=["None"],
low_memory=False,
)
df_nri.rename(
self.df_nri.rename(
columns={
self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME,
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME: self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
@ -123,42 +134,46 @@ class NationalRiskIndexETL(ExtractTransformLoad):
agriculture_columns = [
f"{x}_EALA"
for x in disaster_categories
if f"{x}_EALA" in list(df_nri.columns)
if f"{x}_EALA" in list(self.df_nri.columns)
]
population_columns = [
f"{x}_EALP"
for x in disaster_categories
if f"{x}_EALP" in list(df_nri.columns)
if f"{x}_EALP" in list(self.df_nri.columns)
]
buildings_columns = [
f"{x}_EALB"
for x in disaster_categories
if f"{x}_EALB" in list(df_nri.columns)
if f"{x}_EALB" in list(self.df_nri.columns)
]
disaster_population_sum_series = df_nri[population_columns].sum(axis=1)
disaster_agriculture_sum_series = df_nri[agriculture_columns].sum(
disaster_population_sum_series = self.df_nri[population_columns].sum(
axis=1
)
disaster_buildings_sum_series = df_nri[buildings_columns].sum(axis=1)
disaster_agriculture_sum_series = self.df_nri[agriculture_columns].sum(
axis=1
)
disaster_buildings_sum_series = self.df_nri[buildings_columns].sum(
axis=1
)
# Population EAL Rate = Eal Valp / Population
df_nri[self.EXPECTED_POPULATION_LOSS_RATE_FIELD_NAME] = (
self.df_nri[self.EXPECTED_POPULATION_LOSS_RATE_FIELD_NAME] = (
disaster_population_sum_series
/ df_nri[self.POPULATION_INPUT_FIELD_NAME]
/ self.df_nri[self.POPULATION_INPUT_FIELD_NAME]
)
# Agriculture EAL Rate = Eal Vala / max(Agrivalue, 408000)
## FORMULA ADJUSTMENT 2/17
## Because AGRIVALUE contains a lot of 0s, we are going to consider
## 90th percentile only for places that have some agrivalue at all
df_nri[
self.df_nri[
self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME
] = disaster_agriculture_sum_series / df_nri[
] = disaster_agriculture_sum_series / self.df_nri[
self.AGRICULTURAL_VALUE_INPUT_FIELD_NAME
].clip(
lower=self.AGRIVALUE_LOWER_BOUND
@ -167,11 +182,11 @@ class NationalRiskIndexETL(ExtractTransformLoad):
## Check that this clip worked -- that the only place the value has changed is when the clip took effect
base_expectation = (
disaster_agriculture_sum_series
/ df_nri[self.AGRICULTURAL_VALUE_INPUT_FIELD_NAME]
/ self.df_nri[self.AGRICULTURAL_VALUE_INPUT_FIELD_NAME]
)
assert (
df_nri[
df_nri[self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME]
self.df_nri[
self.df_nri[self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME]
!= base_expectation
][self.AGRICULTURAL_VALUE_INPUT_FIELD_NAME].max()
<= self.AGRIVALUE_LOWER_BOUND
@ -181,27 +196,27 @@ class NationalRiskIndexETL(ExtractTransformLoad):
)
assert (
df_nri[self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME]
self.df_nri[self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME]
!= base_expectation
).sum() > 0, "Clipping the agrivalue did nothing!"
# This produces a boolean that is True in the case of non-zero agricultural value
df_nri[self.CONTAINS_AGRIVALUE] = (
df_nri[self.AGRICULTURAL_VALUE_INPUT_FIELD_NAME] > 0
self.df_nri[self.CONTAINS_AGRIVALUE] = (
self.df_nri[self.AGRICULTURAL_VALUE_INPUT_FIELD_NAME] > 0
)
# divide EAL_VALB (Expected Annual Loss - Building Value) by BUILDVALUE (Building Value ($)).
df_nri[self.EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME] = (
self.df_nri[self.EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME] = (
disaster_buildings_sum_series
/ df_nri[self.BUILDING_VALUE_INPUT_FIELD_NAME]
/ self.df_nri[self.BUILDING_VALUE_INPUT_FIELD_NAME]
)
# Round all float columns to just 10 digits.
# Note: `round` is smart enough to only apply to float columns.
df_nri = df_nri.round(10)
self.df_nri = self.df_nri.round(10)
# Assign the final df to the class' output_df for the load method
self.output_df = df_nri
self.output_df = self.df_nri
def load(self) -> None:
# Suppress scientific notation.