columns to keep and tests

This commit is contained in:
Jorge Escobar 2022-04-11 14:10:21 -04:00
parent c18e69ef90
commit 5f2228913b
5 changed files with 43 additions and 28 deletions

View file

@ -71,6 +71,13 @@ class ExtractTransformLoad:
# COLUMNS_TO_KEEP is used to identify which columns to keep in the output df.
COLUMNS_TO_KEEP: typing.List[str] = None
# INPUT_GEOID_TRACT_FIELD_NAME is the field name that identifies the Census Tract ID
# on the input file
INPUT_GEOID_TRACT_FIELD_NAME: str = None
# NULL_REPRESENTATION is how nulls are represented on the input field
NULL_REPRESENTATION: str = None
# Thirteen digits in a census block group ID.
EXPECTED_CENSUS_BLOCK_GROUPS_CHARACTER_LENGTH: int = 13
# TODO: investigate. Census says there are only 217,740 CBGs in the US. This might
@ -84,10 +91,12 @@ class ExtractTransformLoad:
# periods. https://github.com/usds/justice40-tool/issues/964
EXPECTED_MAX_CENSUS_TRACTS: int = 74160
# We use ourput_df as the final dataframe to use to write to the CSV
# It is used on the "load" base class method
output_df: pd.DataFrame = None
@classmethod
def yaml_config_load(cls):
def yaml_config_load(cls) -> dict:
# check if the class instance has score YAML definitions
datasets_config = load_yaml_dict_from_file(
cls.DATASET_CONFIG / "datasets.yml",
@ -108,10 +117,24 @@ class ExtractTransformLoad:
)
sys.exit()
# set the fields
# set some of the basic fields
cls.LAST_UPDATED_YEAR = dataset_config["last_updated_year"]
cls.SOURCE_URL = dataset_config["source_url"]
cls.INPUT_EXTRACTED_FILE_NAME = dataset_config["extracted_file_name"]
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
"input_geoid_tract_field_name"
]
cls.NULL_REPRESENTATION = dataset_config["null_representation"]
# get the columns to write on the CSV
cls.COLUMNS_TO_KEEP = [
cls.GEOID_TRACT_FIELD_NAME, # always index with geoid tract id
]
for field in dataset_config["load_fields"]:
cls.COLUMNS_TO_KEEP.append(field["long_name"])
# return the config dict
return dataset_config
# This is a classmethod so it can be used by `get_data_frame` without
# needing to create an instance of the class. This is a use case in `etl_score`.
@ -263,8 +286,7 @@ class ExtractTransformLoad:
Data is written in the specified local data folder or remote AWS S3 bucket.
Uses the directory from `self.OUTPUT_DIR` and the file name from
`self._get_output_file_path`.
Uses the directory and the file name from `self._get_output_file_path`.
"""
logger.info(f"Saving `{self.NAME}` CSV")

View file

@ -40,7 +40,7 @@ datasets:
excel_download: true
- short_name: "has_ag_val"
df_field_name: "CONTAINS_AGRIVALUE"
long_name: "Expected building loss rate (Natural Hazards Risk Index)"
long_name: "Contains agricultural value"
field_type: bool
tile_include: true
csv_download: true

View file

@ -26,8 +26,15 @@ class NationalRiskIndexETL(ExtractTransformLoad):
def __init__(self):
# load YAML config
super().yaml_config_load()
self.DATASET_CONFIG = super().yaml_config_load()
# define the full path for the input CSV file
self.INPUT_CSV = self.get_tmp_path() / self.INPUT_EXTRACTED_FILE_NAME
# this is the main dataframe
self.df: pd.DataFrame
# Start dataset-specific vars here
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = (
"EAL_SCORE"
)
@ -58,18 +65,6 @@ class NationalRiskIndexETL(ExtractTransformLoad):
)
self.CONTAINS_AGRIVALUE = "Contains agricultural value"
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
self.EXPECTED_POPULATION_LOSS_RATE_FIELD_NAME,
self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME,
self.EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME,
self.CONTAINS_AGRIVALUE,
]
self.INPUT_CSV = self.get_tmp_path() / self.INPUT_EXTRACTED_FILE_NAME
self.df: pd.DataFrame
def extract(self) -> None:
"""Unzips NRI dataset from the FEMA data source and writes the files
to the temporary data folder for use in the transform() method
@ -90,19 +85,18 @@ class NationalRiskIndexETL(ExtractTransformLoad):
"""
logger.info("Transforming National Risk Index Data")
NRI_TRACT_COL = "TRACTFIPS" # Census Tract Column in NRI data
# read in the unzipped csv from NRI data source then rename the
# Census Tract column for merging
df_nri: pd.DataFrame = pd.read_csv(
self.INPUT_CSV,
dtype={NRI_TRACT_COL: "string"},
na_values=["None"],
dtype={self.INPUT_GEOID_TRACT_FIELD_NAME: "string"},
na_values=[self.NULL_REPRESENTATION],
low_memory=False,
)
df_nri.rename(
columns={
NRI_TRACT_COL: self.GEOID_TRACT_FIELD_NAME,
self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME,
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME: self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME,
},
inplace=True,
@ -185,6 +179,7 @@ class NationalRiskIndexETL(ExtractTransformLoad):
# Note: `round` is smart enough to only apply to float columns.
df_nri = df_nri.round(10)
# Assign the final df to the class' output_df for the load method
self.output_df = df_nri
def load(self) -> None:

View file

@ -52,8 +52,3 @@ def mock_etl(monkeypatch, mock_paths) -> None:
data_path, tmp_path = mock_paths
monkeypatch.setattr(ExtractTransformLoad, "DATA_PATH", data_path)
monkeypatch.setattr(ExtractTransformLoad, "TMP_PATH", tmp_path)
monkeypatch.setattr(
ExtractTransformLoad,
"CONTENT_CONFIG",
Path.cwd() / "data_pipeline" / "score" / "config",
)

View file

@ -119,6 +119,7 @@ class TestETL:
"""
# Setup
etl = self._get_instance_of_etl_class()
etl.__init__()
data_path, tmp_path = mock_paths
assert etl.DATA_PATH == data_path
@ -256,6 +257,7 @@ class TestETL:
etl = self._setup_etl_instance_and_run_extract(
mock_etl=mock_etl, mock_paths=mock_paths
)
etl.__init__()
etl.transform()
assert etl.output_df is not None
@ -273,6 +275,7 @@ class TestETL:
"""
# setup - input variables
etl = self._get_instance_of_etl_class()
etl.__init__()
# setup - mock transform step
df_transform = pd.read_csv(