Refactor DOE Energy Burden and COI to use YAML (#1796)

* added tribalId for Supplemental dataset (#1804)

* Setting zoom levels for tribal map (#1810)

* NRI dataset and initial score YAML configuration (#1534)

* update be staging gha

* NRI dataset and initial score YAML configuration

* checkpoint

* adding data checks for release branch

* passing tests

* adding INPUT_EXTRACTED_FILE_NAME to base class

* lint

* columns to keep and tests

* update be staging gha

* checkpoint

* update be staging gha

* NRI dataset and initial score YAML configuration

* checkpoint

* adding data checks for release branch

* passing tests

* adding INPUT_EXTRACTED_FILE_NAME to base class

* lint

* columns to keep and tests

* checkpoint

* PR Review

* renoving source url

* tests

* stop execution of ETL if there's a YAML schema issue

* update be staging gha

* adding source url as class var again

* clean up

* force cache bust

* gha cache bust

* dynamically set score vars from YAML

* docsctrings

* removing last updated year - optional reverse percentile

* passing tests

* sort order

* column ordening

* PR review

* class level vars

* Updating DatasetsConfig

* fix pylint errors

* moving metadata hint back to code

Co-authored-by: lucasmbrown-usds <lucas.m.brown@omb.eop.gov>

* Correct copy typo (#1809)

* Add basic test suite for COI (#1518)

* Update COI to use new yaml (#1518)

* Add tests for DOE energy budren (1518

* Add dataset config for energy budren (1518)

* Refactor ETL to use datasets.yml (#1518)

* Add fake GEOIDs to COI tests (#1518)

* Refactor _setup_etl_instance_and_run_extract to base (#1518)

For the three classes we've done so far, a generic
_setup_etl_instance_and_run_extract will work fine, for the moment we
can reuse the same setup method until we decide future classes need more
flexibility --- but they can also always subclass so...

* Add output-path tests (#1518)

* Update YAML to match constant (#1518)

* Don't blindly set float format (#1518)

* Add defaults for extract (#1518)

* Run YAML load on all subclasses (#1518)

* Update description fields (#1518)

* Update YAML per final format (#1518)

* Update fixture tract IDs (#1518)

* Update base class refactor (#1518)

Now that NRI is final I needed to make a small number of updates to my
refactored code.

* Remove old comment (#1518)

* Fix type signature and return (#1518)

* Update per code review (#1518)

Co-authored-by: Jorge Escobar <83969469+esfoobar-usds@users.noreply.github.com>
Co-authored-by: lucasmbrown-usds <lucas.m.brown@omb.eop.gov>
Co-authored-by: Vim <86254807+vim-usds@users.noreply.github.com>
This commit is contained in:
Matt Bowen 2022-08-10 16:02:59 -04:00 committed by GitHub
commit 9635ef5ee2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 698 additions and 3640 deletions

View file

@ -1,9 +1,8 @@
from pathlib import Path
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.score import field_names
from data_pipeline.utils import get_module_logger, unzip_file_from_url
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
@ -21,15 +20,27 @@ class ChildOpportunityIndex(ExtractTransformLoad):
Full technical documents: https://www.diversitydatakids.org/sites/default/files/2020-02/ddk_coi2.0_technical_documentation_20200212.pdf.
Github repo: https://github.com/diversitydatakids/COI/
"""
# Metadata for the baseclass
NAME = "child_opportunity_index"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
# Define these for easy code completion
EXTREME_HEAT_FIELD: str
HEALTHY_FOOD_FIELD: str
IMPENETRABLE_SURFACES_FIELD: str
READING_FIELD: str
def __init__(self):
self.COI_FILE_URL = (
self.SOURCE_URL = (
"https://data.diversitydatakids.org/datastore/zip/f16fff12-b1e5-4f60-85d3-"
"3a0ededa30a0?format=csv"
)
# TODO: Decide about nixing this
self.TRACT_INPUT_COLUMN_NAME = self.INPUT_GEOID_TRACT_FIELD_NAME
self.OUTPUT_PATH: Path = (
self.DATA_PATH / "dataset" / "child_opportunity_index"
)
@ -40,31 +51,19 @@ class ChildOpportunityIndex(ExtractTransformLoad):
self.IMPENETRABLE_SURFACES_INPUT_FIELD = "HE_GREEN"
self.READING_INPUT_FIELD = "ED_READING"
# Constants for output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
field_names.EXTREME_HEAT_FIELD,
field_names.HEALTHY_FOOD_FIELD,
field_names.IMPENETRABLE_SURFACES_FIELD,
field_names.READING_FIELD,
]
self.raw_df: pd.DataFrame
self.output_df: pd.DataFrame
def extract(self) -> None:
logger.info("Starting 51MB data download.")
unzip_file_from_url(
file_url=self.COI_FILE_URL,
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "child_opportunity_index",
super().extract(
source_url=self.SOURCE_URL,
extract_path=self.get_tmp_path(),
)
self.raw_df = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "child_opportunity_index"
/ "raw.csv",
def transform(self) -> None:
logger.info("Starting transforms.")
raw_df = pd.read_csv(
filepath_or_buffer=self.get_tmp_path() / "raw.csv",
# The following need to remain as strings for all of their digits, not get
# converted to numbers.
dtype={
@ -73,16 +72,13 @@ class ChildOpportunityIndex(ExtractTransformLoad):
low_memory=False,
)
def transform(self) -> None:
logger.info("Starting transforms.")
output_df = self.raw_df.rename(
output_df = raw_df.rename(
columns={
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
self.EXTREME_HEAT_INPUT_FIELD: field_names.EXTREME_HEAT_FIELD,
self.HEALTHY_FOOD_INPUT_FIELD: field_names.HEALTHY_FOOD_FIELD,
self.IMPENETRABLE_SURFACES_INPUT_FIELD: field_names.IMPENETRABLE_SURFACES_FIELD,
self.READING_INPUT_FIELD: field_names.READING_FIELD,
self.EXTREME_HEAT_INPUT_FIELD: self.EXTREME_HEAT_FIELD,
self.HEALTHY_FOOD_INPUT_FIELD: self.HEALTHY_FOOD_FIELD,
self.IMPENETRABLE_SURFACES_INPUT_FIELD: self.IMPENETRABLE_SURFACES_FIELD,
self.READING_INPUT_FIELD: self.READING_FIELD,
}
)
@ -95,8 +91,8 @@ class ChildOpportunityIndex(ExtractTransformLoad):
# Convert percents from 0-100 to 0-1 to standardize with our other fields.
percent_fields_to_convert = [
field_names.HEALTHY_FOOD_FIELD,
field_names.IMPENETRABLE_SURFACES_FIELD,
self.HEALTHY_FOOD_FIELD,
self.IMPENETRABLE_SURFACES_FIELD,
]
for percent_field_to_convert in percent_fields_to_convert:
@ -105,11 +101,3 @@ class ChildOpportunityIndex(ExtractTransformLoad):
)
self.output_df = output_df
def load(self) -> None:
logger.info("Saving CSV")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)

View file

@ -2,63 +2,48 @@ from pathlib import Path
import pandas as pd
from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger, unzip_file_from_url
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
class DOEEnergyBurden(ExtractTransformLoad):
def __init__(self):
self.DOE_FILE_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/DOE_LEAD_AMI_TRACT_2018_ALL.csv.zip"
)
NAME = "doe_energy_burden"
SOURCE_URL: str = (
settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/DOE_LEAD_AMI_TRACT_2018_ALL.csv.zip"
)
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
REVISED_ENERGY_BURDEN_FIELD_NAME: str
def __init__(self):
self.OUTPUT_PATH: Path = (
self.DATA_PATH / "dataset" / "doe_energy_burden"
)
self.TRACT_INPUT_COLUMN_NAME = "FIP"
self.INPUT_ENERGY_BURDEN_FIELD_NAME = "BURDEN"
self.REVISED_ENERGY_BURDEN_FIELD_NAME = "Energy burden"
# Constants for output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.REVISED_ENERGY_BURDEN_FIELD_NAME,
]
self.raw_df: pd.DataFrame
self.output_df: pd.DataFrame
def extract(self) -> None:
logger.info("Starting data download.")
unzip_file_from_url(
file_url=self.DOE_FILE_URL,
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "doe_energy_burden",
)
self.raw_df = pd.read_csv(
def transform(self) -> None:
logger.info("Starting DOE Energy Burden transforms.")
raw_df: pd.DataFrame = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "doe_energy_burden"
/ "DOE_LEAD_AMI_TRACT_2018_ALL.csv",
# The following need to remain as strings for all of their digits, not get converted to numbers.
dtype={
self.TRACT_INPUT_COLUMN_NAME: "string",
self.INPUT_GEOID_TRACT_FIELD_NAME: "string",
},
low_memory=False,
)
def transform(self) -> None:
logger.info("Starting transforms.")
output_df = self.raw_df.rename(
logger.info("Renaming columns and ensuring output format is correct")
output_df = raw_df.rename(
columns={
self.INPUT_ENERGY_BURDEN_FIELD_NAME: self.REVISED_ENERGY_BURDEN_FIELD_NAME,
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME,
}
)
@ -75,7 +60,4 @@ class DOEEnergyBurden(ExtractTransformLoad):
def load(self) -> None:
logger.info("Saving DOE Energy Burden CSV")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)
super().load()

View file

@ -15,10 +15,16 @@ class NationalRiskIndexETL(ExtractTransformLoad):
"""ETL class for the FEMA National Risk Index dataset"""
NAME = "national_risk_index"
LAST_UPDATED_YEAR = 2020
SOURCE_URL = "https://hazards.fema.gov/nri/Content/StaticDocuments/DataDownload//NRI_Table_CensusTracts/NRI_Table_CensusTracts.zip"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
# Output score variables (values set on datasets.yml) for linting purposes
RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME: str
EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME: str
EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME: str
EXPECTED_POPULATION_LOSS_RATE_FIELD_NAME: str
CONTAINS_AGRIVALUE: str
## TEMPORARILY HERE
## To get this value up in time for launch, we've hard coded it. We would like
## to, in the future, have this pull the 10th percentile (or nth percentile)
@ -27,54 +33,31 @@ class NationalRiskIndexETL(ExtractTransformLoad):
AGRIVALUE_LOWER_BOUND = 408000
def __init__(self):
# define the full path for the input CSV file
self.INPUT_CSV = self.get_tmp_path() / "NRI_Table_CensusTracts.csv"
# this is the main dataframe
self.df: pd.DataFrame
# Start dataset-specific vars here
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = (
"EAL_SCORE"
)
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME = (
"FEMA Risk Index Expected Annual Loss Score"
)
self.EXPECTED_ANNUAL_LOSS_BUILDING_VALUE_INPUT_FIELD_NAME = "EAL_VALB"
self.EXPECTED_ANNUAL_LOSS_AGRICULTURAL_VALUE_INPUT_FIELD_NAME = (
"EAL_VALA"
)
self.EXPECTED_ANNUAL_LOSS_POPULATION_VALUE_INPUT_FIELD_NAME = "EAL_VALP"
self.AGRICULTURAL_VALUE_INPUT_FIELD_NAME = "AGRIVALUE"
self.POPULATION_INPUT_FIELD_NAME = "POPULATION"
self.BUILDING_VALUE_INPUT_FIELD_NAME = "BUILDVALUE"
self.EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME = (
"Expected building loss rate (Natural Hazards Risk Index)"
)
self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME = (
"Expected agricultural loss rate (Natural Hazards Risk Index)"
)
self.EXPECTED_POPULATION_LOSS_RATE_FIELD_NAME = (
"Expected population loss rate (Natural Hazards Risk Index)"
)
self.CONTAINS_AGRIVALUE = "Contains agricultural value"
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
self.EXPECTED_POPULATION_LOSS_RATE_FIELD_NAME,
self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME,
self.EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME,
self.CONTAINS_AGRIVALUE,
]
self.df: pd.DataFrame
def extract(self) -> None:
"""Unzips NRI dataset from the FEMA data source and writes the files
to the temporary data folder for use in the transform() method
"""
logger.info("Downloading 405MB National Risk Index Data")
super().extract(
source_url=self.SOURCE_URL,
extract_path=self.get_tmp_path(),
@ -90,19 +73,18 @@ class NationalRiskIndexETL(ExtractTransformLoad):
"""
logger.info("Transforming National Risk Index Data")
NRI_TRACT_COL = "TRACTFIPS" # Census Tract Column in NRI data
# read in the unzipped csv from NRI data source then rename the
# Census Tract column for merging
df_nri: pd.DataFrame = pd.read_csv(
self.INPUT_CSV,
dtype={NRI_TRACT_COL: "string"},
dtype={self.INPUT_GEOID_TRACT_FIELD_NAME: "string"},
na_values=["None"],
low_memory=False,
)
df_nri.rename(
columns={
NRI_TRACT_COL: self.GEOID_TRACT_FIELD_NAME,
self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME,
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME: self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
},
inplace=True,
@ -170,6 +152,7 @@ class NationalRiskIndexETL(ExtractTransformLoad):
].clip(
lower=self.AGRIVALUE_LOWER_BOUND
)
# This produces a boolean that is True in the case of non-zero agricultural value
df_nri[self.CONTAINS_AGRIVALUE] = (
df_nri[self.AGRICULTURAL_VALUE_INPUT_FIELD_NAME] > 0
@ -185,6 +168,7 @@ class NationalRiskIndexETL(ExtractTransformLoad):
# Note: `round` is smart enough to only apply to float columns.
df_nri = df_nri.round(10)
# Assign the final df to the class' output_df for the load method
self.output_df = df_nri
def load(self) -> None:

View file

@ -81,13 +81,13 @@ class TribalETL(ExtractTransformLoad):
bia_aian_supplemental_df = gpd.read_file(tribal_geojson_path)
bia_aian_supplemental_df.drop(
["OBJECTID", "GISAcres", "Source", "Shape_Length", "Shape_Area"],
["GISAcres", "Source", "Shape_Length", "Shape_Area"],
axis=1,
inplace=True,
)
bia_aian_supplemental_df.rename(
columns={"Land_Area_": "landAreaName"},
columns={"OBJECTID": "tribalId", "Land_Area_": "landAreaName"},
inplace=True,
)