This commit is contained in:
lucasmbrown-usds 2022-09-07 16:17:03 -04:00
parent 70606440fb
commit 858afa8a40
2 changed files with 38 additions and 43 deletions

View file

@ -223,6 +223,8 @@ class ExtractTransformLoad:
""" """
# TODO: remove this once all ETL classes are converted to using the new # TODO: remove this once all ETL classes are converted to using the new
# base class parameters and patterns. # base class parameters and patterns.
# TODO: determine how to use this currently in the partially refactored world.
# https://github.com/usds/justice40-tool/issues/1891
if self.GEO_LEVEL is None: if self.GEO_LEVEL is None:
logger.info( logger.info(
"Skipping validation step for this class because it does not " "Skipping validation step for this class because it does not "

View file

@ -1,3 +1,4 @@
import pathlib
from pathlib import Path from pathlib import Path
import pandas as pd import pandas as pd
@ -5,7 +6,7 @@ from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.etl.score.etl_utils import ( from data_pipeline.etl.score.etl_utils import (
compare_to_list_of_expected_state_fips_codes, compare_to_list_of_expected_state_fips_codes,
) )
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes from data_pipeline.score import field_names
from data_pipeline.utils import get_module_logger, download_file_from_url from data_pipeline.utils import get_module_logger, download_file_from_url
logger = get_module_logger(__name__) logger = get_module_logger(__name__)
@ -31,41 +32,47 @@ class CDCLifeExpectancy(ExtractTransformLoad):
self.TRACT_INPUT_COLUMN_NAME = "Tract ID" self.TRACT_INPUT_COLUMN_NAME = "Tract ID"
self.STATE_INPUT_COLUMN_NAME = "STATE2KX" self.STATE_INPUT_COLUMN_NAME = "STATE2KX"
self.LIFE_EXPECTANCY_FIELD_NAME = "Life expectancy (years)"
# Constants for output # Constants for output
self.COLUMNS_TO_KEEP = [ self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME,
self.LIFE_EXPECTANCY_FIELD_NAME, field_names.LIFE_EXPECTANCY_FIELD,
] ]
self.raw_df: pd.DataFrame self.raw_df: pd.DataFrame
self.output_df: pd.DataFrame self.output_df: pd.DataFrame
def extract(self) -> None: def _download_and_prep_data(
logger.info("Starting data download.") self, file_url: str, download_file_name: pathlib.Path
) -> pd.DataFrame:
all_usa_download_file_name = (
self.get_tmp_path() / "cdc_life_expectancy" / "usa.csv"
)
download_file_from_url( download_file_from_url(
file_url=self.USA_FILE_URL, file_url=file_url,
download_file_name=all_usa_download_file_name, download_file_name=download_file_name,
verify=True, verify=True,
) )
pandas_read_csv_dtype_settings = { df = pd.read_csv(
filepath_or_buffer=download_file_name,
dtype={
# The following need to remain as strings for all of their digits, not get converted to numbers. # The following need to remain as strings for all of their digits, not get converted to numbers.
self.TRACT_INPUT_COLUMN_NAME: "string", self.TRACT_INPUT_COLUMN_NAME: "string",
self.STATE_INPUT_COLUMN_NAME: "string", self.STATE_INPUT_COLUMN_NAME: "string",
} },
all_usa_raw_df = pd.read_csv(
filepath_or_buffer=all_usa_download_file_name,
dtype=pandas_read_csv_dtype_settings,
low_memory=False, low_memory=False,
) )
return df
def extract(self) -> None:
logger.info("Starting data download.")
all_usa_raw_df = self._download_and_prep_data(
file_url=self.USA_FILE_URL,
download_file_name=self.get_tmp_path()
/ "cdc_life_expectancy"
/ "usa.csv",
)
# Check which states are missing # Check which states are missing
states_in_life_expectancy_usa_file = list( states_in_life_expectancy_usa_file = list(
all_usa_raw_df[self.STATE_INPUT_COLUMN_NAME].unique() all_usa_raw_df[self.STATE_INPUT_COLUMN_NAME].unique()
@ -81,33 +88,19 @@ class CDCLifeExpectancy(ExtractTransformLoad):
) )
logger.info("Downloading data for Maine") logger.info("Downloading data for Maine")
maine_download_file_name = ( maine_raw_df = self._download_and_prep_data(
self.get_tmp_path() / "cdc_life_expectancy" / "maine.csv"
)
download_file_from_url(
file_url=self.MAINE_FILE_URL, file_url=self.MAINE_FILE_URL,
download_file_name=maine_download_file_name, download_file_name=self.get_tmp_path()
verify=True, / "cdc_life_expectancy"
) / "maine.csv",
maine_raw_df = pd.read_csv(
filepath_or_buffer=maine_download_file_name,
dtype=pandas_read_csv_dtype_settings,
low_memory=False,
) )
logger.info("Downloading data for Wisconsin") logger.info("Downloading data for Wisconsin")
wisconsin_download_file_name = ( wisconsin_raw_df = self._download_and_prep_data(
self.get_tmp_path() / "cdc_life_expectancy" / "wisconsin.csv"
)
download_file_from_url(
file_url=self.WISCONSIN_FILE_URL, file_url=self.WISCONSIN_FILE_URL,
download_file_name=wisconsin_download_file_name, download_file_name=self.get_tmp_path()
verify=True, / "cdc_life_expectancy"
) / "wisconsin.csv",
wisconsin_raw_df = pd.read_csv(
filepath_or_buffer=wisconsin_download_file_name,
dtype=pandas_read_csv_dtype_settings,
low_memory=False,
) )
combined_df = pd.concat( combined_df = pd.concat(
@ -138,7 +131,7 @@ class CDCLifeExpectancy(ExtractTransformLoad):
self.output_df = self.raw_df.rename( self.output_df = self.raw_df.rename(
columns={ columns={
"e(0)": self.LIFE_EXPECTANCY_FIELD_NAME, "e(0)": field_names.LIFE_EXPECTANCY_FIELD,
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME, self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
} }
) )