Refactor ETL to use datasets.yml (#1518)

This commit is contained in:
Matthew Bowen 2022-07-27 17:35:56 -04:00 committed by matt bowen
parent c68371f051
commit 76d4ebe0c3
2 changed files with 20 additions and 34 deletions

View file

@ -3,7 +3,7 @@ import pandas as pd
from data_pipeline.config import settings from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger, unzip_file_from_url from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__) logger = get_module_logger(__name__)
@ -16,52 +16,41 @@ class DOEEnergyBurden(ExtractTransformLoad):
) )
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
REVISED_ENERGY_BURDEN_FIELD_NAME: str
def __init__(self): def __init__(self):
self.DOE_FILE_URL = self.SOURCE_URL self.DATASET_CONFIG = super().yaml_config_load()
self.OUTPUT_PATH: Path = ( self.OUTPUT_PATH: Path = (
self.DATA_PATH / "dataset" / "doe_energy_burden" self.DATA_PATH / "dataset" / "doe_energy_burden"
) )
self.TRACT_INPUT_COLUMN_NAME = "FIP"
self.INPUT_ENERGY_BURDEN_FIELD_NAME = "BURDEN" self.INPUT_ENERGY_BURDEN_FIELD_NAME = "BURDEN"
self.REVISED_ENERGY_BURDEN_FIELD_NAME = "Energy burden"
# Constants for output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.REVISED_ENERGY_BURDEN_FIELD_NAME,
]
self.raw_df: pd.DataFrame self.raw_df: pd.DataFrame
self.output_df: pd.DataFrame self.output_df: pd.DataFrame
def extract(self) -> None: def extract(self) -> None:
logger.info("Starting data download.") # TODO: Make these defaults so etract can be blank most of the time
super().extract(
unzip_file_from_url( source_url=self.SOURCE_URL, extract_path=self.get_tmp_path()
file_url=self.DOE_FILE_URL,
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path()
)
self.raw_df = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "DOE_LEAD_AMI_TRACT_2018_ALL.csv",
# The following need to remain as strings for all of their digits, not get converted to numbers.
dtype={
self.TRACT_INPUT_COLUMN_NAME: "string",
},
low_memory=False,
) )
def transform(self) -> None: def transform(self) -> None:
logger.info("Starting transforms.") logger.info("Starting transforms.")
raw_df: pd.DataFrame = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "DOE_LEAD_AMI_TRACT_2018_ALL.csv",
# The following need to remain as strings for all of their digits, not get converted to numbers.
dtype={
self.INPUT_GEOID_TRACT_FIELD_NAME: "string",
},
low_memory=False,
)
output_df = self.raw_df.rename( output_df = raw_df.rename(
columns={ columns={
self.INPUT_ENERGY_BURDEN_FIELD_NAME: self.REVISED_ENERGY_BURDEN_FIELD_NAME, self.INPUT_ENERGY_BURDEN_FIELD_NAME: self.REVISED_ENERGY_BURDEN_FIELD_NAME,
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME, self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME,
} }
) )
@ -78,7 +67,4 @@ class DOEEnergyBurden(ExtractTransformLoad):
def load(self) -> None: def load(self) -> None:
logger.info("Saving DOE Energy Burden CSV") logger.info("Saving DOE Energy Burden CSV")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True) super().load(float_format="%.10f")
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)

View file

@ -77,6 +77,6 @@ class TestDOEEnergyBurdenETL(TestETL):
] ]
assert etl.GEOID_FIELD_NAME == "GEOID10" assert etl.GEOID_FIELD_NAME == "GEOID10"
assert etl.GEOID_TRACT_FIELD_NAME == "GEOID10_TRACT" assert etl.GEOID_TRACT_FIELD_NAME == "GEOID10_TRACT"
assert etl.TRACT_INPUT_COLUMN_NAME == "FIP" assert etl.INPUT_GEOID_TRACT_FIELD_NAME == "FIP"
assert etl.INPUT_ENERGY_BURDEN_FIELD_NAME == "BURDEN" assert etl.INPUT_ENERGY_BURDEN_FIELD_NAME == "BURDEN"
assert etl.REVISED_ENERGY_BURDEN_FIELD_NAME == "Energy burden" assert etl.REVISED_ENERGY_BURDEN_FIELD_NAME == "Energy burden"