mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-23 10:04:18 -08:00
fixing missing states
This commit is contained in:
parent
d41153d89d
commit
c6569b641e
1 changed files with 105 additions and 11 deletions
|
@ -2,6 +2,11 @@ from pathlib import Path
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
from data_pipeline.etl.base import ExtractTransformLoad
|
from data_pipeline.etl.base import ExtractTransformLoad
|
||||||
|
from data_pipeline.etl.score.constants import (
|
||||||
|
TILES_ISLAND_AREA_FIPS_CODES,
|
||||||
|
TILES_PUERTO_RICO_FIPS_CODE,
|
||||||
|
)
|
||||||
|
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
|
||||||
from data_pipeline.utils import get_module_logger, download_file_from_url
|
from data_pipeline.utils import get_module_logger, download_file_from_url
|
||||||
|
|
||||||
logger = get_module_logger(__name__)
|
logger = get_module_logger(__name__)
|
||||||
|
@ -9,12 +14,19 @@ logger = get_module_logger(__name__)
|
||||||
|
|
||||||
class CDCLifeExpectancy(ExtractTransformLoad):
|
class CDCLifeExpectancy(ExtractTransformLoad):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.FILE_URL: str = "https://ftp.cdc.gov/pub/Health_Statistics/NCHS/Datasets/NVSS/USALEEP/CSV/US_A.CSV"
|
self.USA_FILE_URL: str = "https://ftp.cdc.gov/pub/Health_Statistics/NCHS/Datasets/NVSS/USALEEP/CSV/US_A.CSV"
|
||||||
|
|
||||||
|
# For some reason, LEEP does not include Maine or Wisconsin in its "All of
|
||||||
|
# USA" file. Load these separately.
|
||||||
|
self.WISCONSIN_FILE_URL: str = "https://ftp.cdc.gov/pub/Health_Statistics/NCHS/Datasets/NVSS/USALEEP/CSV/WI_A.CSV"
|
||||||
|
self.MAINE_FILE_URL: str = "https://ftp.cdc.gov/pub/Health_Statistics/NCHS/Datasets/NVSS/USALEEP/CSV/ME_A.CSV"
|
||||||
|
|
||||||
self.OUTPUT_PATH: Path = (
|
self.OUTPUT_PATH: Path = (
|
||||||
self.DATA_PATH / "dataset" / "cdc_life_expectancy"
|
self.DATA_PATH / "dataset" / "cdc_life_expectancy"
|
||||||
)
|
)
|
||||||
|
|
||||||
self.TRACT_INPUT_COLUMN_NAME = "Tract ID"
|
self.TRACT_INPUT_COLUMN_NAME = "Tract ID"
|
||||||
|
self.STATE_INPUT_COLUMN_NAME = "STATE2KX"
|
||||||
self.LIFE_EXPECTANCY_FIELD_NAME = "Life expectancy (years)"
|
self.LIFE_EXPECTANCY_FIELD_NAME = "Life expectancy (years)"
|
||||||
|
|
||||||
# Constants for output
|
# Constants for output
|
||||||
|
@ -29,26 +41,108 @@ class CDCLifeExpectancy(ExtractTransformLoad):
|
||||||
def extract(self) -> None:
|
def extract(self) -> None:
|
||||||
logger.info("Starting data download.")
|
logger.info("Starting data download.")
|
||||||
|
|
||||||
download_file_name = (
|
all_usa_download_file_name = (
|
||||||
self.get_tmp_path() / "cdc_life_expectancy" / "usa.csv"
|
self.get_tmp_path() / "cdc_life_expectancy" / "usa.csv"
|
||||||
)
|
)
|
||||||
download_file_from_url(
|
download_file_from_url(
|
||||||
file_url=self.FILE_URL,
|
file_url=self.USA_FILE_URL,
|
||||||
download_file_name=download_file_name,
|
download_file_name=all_usa_download_file_name,
|
||||||
verify=True,
|
verify=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.raw_df = pd.read_csv(
|
pandas_read_csv_dtype_settings = {
|
||||||
filepath_or_buffer=download_file_name,
|
# The following need to remain as strings for all of their digits, not get converted to numbers.
|
||||||
dtype={
|
self.TRACT_INPUT_COLUMN_NAME: "string",
|
||||||
# The following need to remain as strings for all of their digits, not get converted to numbers.
|
self.STATE_INPUT_COLUMN_NAME: "string",
|
||||||
self.TRACT_INPUT_COLUMN_NAME: "string",
|
}
|
||||||
},
|
|
||||||
|
all_usa_raw_df = pd.read_csv(
|
||||||
|
filepath_or_buffer=all_usa_download_file_name,
|
||||||
|
dtype=pandas_read_csv_dtype_settings,
|
||||||
low_memory=False,
|
low_memory=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Check which states are missing
|
||||||
|
state_fips_codes = get_state_fips_codes(self.DATA_PATH)
|
||||||
|
states_in_life_expectancy_usa_file = all_usa_raw_df[
|
||||||
|
self.STATE_INPUT_COLUMN_NAME
|
||||||
|
].unique()
|
||||||
|
|
||||||
|
expected_states_set = (
|
||||||
|
set(state_fips_codes)
|
||||||
|
# We don't expect LEEP to have data for island areas or Puerto Rico.
|
||||||
|
- set(TILES_ISLAND_AREA_FIPS_CODES)
|
||||||
|
- set(TILES_PUERTO_RICO_FIPS_CODE)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find which states are missing from the expected set.
|
||||||
|
states_missing = sorted(
|
||||||
|
list(expected_states_set - set(states_in_life_expectancy_usa_file))
|
||||||
|
)
|
||||||
|
|
||||||
|
if states_missing != ["23", "55"]:
|
||||||
|
raise ValueError(
|
||||||
|
"LEEP data has changed. The states missing from the data are "
|
||||||
|
"no longer the same."
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info("Downloading data for Maine")
|
||||||
|
maine_download_file_name = (
|
||||||
|
self.get_tmp_path() / "cdc_life_expectancy" / "maine.csv"
|
||||||
|
)
|
||||||
|
download_file_from_url(
|
||||||
|
file_url=self.MAINE_FILE_URL,
|
||||||
|
download_file_name=maine_download_file_name,
|
||||||
|
verify=True,
|
||||||
|
)
|
||||||
|
maine_raw_df = pd.read_csv(
|
||||||
|
filepath_or_buffer=maine_download_file_name,
|
||||||
|
dtype=pandas_read_csv_dtype_settings,
|
||||||
|
low_memory=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info("Downloading data for Wisconsin")
|
||||||
|
wisconsin_download_file_name = (
|
||||||
|
self.get_tmp_path() / "cdc_life_expectancy" / "wisconsin.csv"
|
||||||
|
)
|
||||||
|
download_file_from_url(
|
||||||
|
file_url=self.WISCONSIN_FILE_URL,
|
||||||
|
download_file_name=wisconsin_download_file_name,
|
||||||
|
verify=True,
|
||||||
|
)
|
||||||
|
wisconsin_raw_df = pd.read_csv(
|
||||||
|
filepath_or_buffer=wisconsin_download_file_name,
|
||||||
|
dtype=pandas_read_csv_dtype_settings,
|
||||||
|
low_memory=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
combined_df = pd.concat(
|
||||||
|
objs=[all_usa_raw_df, maine_raw_df, wisconsin_raw_df],
|
||||||
|
ignore_index=True,
|
||||||
|
verify_integrity=True,
|
||||||
|
axis=0,
|
||||||
|
)
|
||||||
|
|
||||||
|
states_in_combined_df = combined_df[
|
||||||
|
self.STATE_INPUT_COLUMN_NAME
|
||||||
|
].unique()
|
||||||
|
|
||||||
|
# Find which states are missing from the combined df.
|
||||||
|
states_missing = sorted(
|
||||||
|
list(expected_states_set - set(states_in_combined_df))
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(states_missing) != 0:
|
||||||
|
raise ValueError(
|
||||||
|
"The states missing from combined dataframe are "
|
||||||
|
"no longer as expected."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Save the updated version
|
||||||
|
self.raw_df = combined_df
|
||||||
|
|
||||||
def transform(self) -> None:
|
def transform(self) -> None:
|
||||||
logger.info("Starting DOE energy burden transform.")
|
logger.info("Starting CDC life expectancy transform.")
|
||||||
|
|
||||||
self.output_df = self.raw_df.rename(
|
self.output_df = self.raw_df.rename(
|
||||||
columns={
|
columns={
|
||||||
|
|
Loading…
Add table
Reference in a new issue