mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-07-28 14:11:17 -07:00
Data Unit Tests (#509)
* Fixes #341 - As a J40 developer, I want to write Unit Tests for the ETL files, so that tests are run on each commit * Location bug * Adding Load tests * Fixing XLSX filename * Adding downloadable zip test * updating pickle * Fixing pylint warnings * Updte readme to correct some typos and reorganize test content structure * Removing unused schemas file, adding details to readme around pickles, per PR feedback * Update test to pass with Score D added to score file; update path in readme * fix requirements.txt after merge * fix poetry.lock after merge Co-authored-by: Shelby Switzer <shelby.switzer@cms.hhs.gov>
This commit is contained in:
parent
88c8209bb0
commit
536a35d6a0
17 changed files with 676 additions and 242 deletions
108
data/data-pipeline/data_pipeline/etl/score/constants.py
Normal file
108
data/data-pipeline/data_pipeline/etl/score/constants.py
Normal file
|
@ -0,0 +1,108 @@
|
|||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
from data_pipeline.config import settings
|
||||
|
||||
# Base Paths
|
||||
DATA_PATH = Path(settings.APP_ROOT) / "data"
|
||||
TMP_PATH = DATA_PATH / "tmp"
|
||||
|
||||
# Remote Paths
|
||||
CENSUS_COUNTIES_ZIP_URL = "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/Gaz_counties_national.zip"
|
||||
|
||||
# Local Paths
|
||||
CENSUS_COUNTIES_FILE_NAME = TMP_PATH / "Gaz_counties_national.txt"
|
||||
|
||||
# Census paths
|
||||
DATA_CENSUS_DIR = DATA_PATH / "census"
|
||||
DATA_CENSUS_CSV_DIR = DATA_CENSUS_DIR / "csv"
|
||||
DATA_CENSUS_CSV_FILE_PATH = DATA_CENSUS_CSV_DIR / "us.csv"
|
||||
DATA_CENSUS_CSV_STATE_FILE_PATH = DATA_CENSUS_CSV_DIR / "fips_states_2010.csv"
|
||||
|
||||
|
||||
# Score paths
|
||||
DATA_SCORE_DIR = DATA_PATH / "score"
|
||||
|
||||
## Score CSV Paths
|
||||
DATA_SCORE_CSV_DIR = DATA_SCORE_DIR / "csv"
|
||||
DATA_SCORE_CSV_FULL_DIR = DATA_SCORE_CSV_DIR / "full"
|
||||
DATA_SCORE_CSV_FULL_FILE_PATH = DATA_SCORE_CSV_FULL_DIR / "usa.csv"
|
||||
FULL_SCORE_CSV_FULL_PLUS_COUNTIES_FILE_PATH = (
|
||||
DATA_SCORE_CSV_FULL_DIR / "usa_counties.csv"
|
||||
)
|
||||
|
||||
## Score Tile paths
|
||||
DATA_SCORE_TILES_DIR = DATA_SCORE_DIR / "tiles"
|
||||
DATA_SCORE_TILES_FILE_PATH = DATA_SCORE_TILES_DIR / "usa.csv"
|
||||
|
||||
# Downloadable paths
|
||||
SCORE_DOWNLOADABLE_DIR = DATA_SCORE_DIR / "downloadable"
|
||||
SCORE_DOWNLOADABLE_CSV_FILE_PATH = SCORE_DOWNLOADABLE_DIR / "usa.csv"
|
||||
SCORE_DOWNLOADABLE_EXCEL_FILE_PATH = SCORE_DOWNLOADABLE_DIR / "usa.xlsx"
|
||||
SCORE_DOWNLOADABLE_ZIP_FILE_PATH = SCORE_DOWNLOADABLE_DIR / "Screening Tool Data.zip"
|
||||
|
||||
# Column subsets
|
||||
CENSUS_COUNTIES_COLUMNS = ["USPS", "GEOID", "NAME"]
|
||||
TILES_SCORE_COLUMNS = [
|
||||
"GEOID10",
|
||||
"State Name",
|
||||
"County Name",
|
||||
"Total population",
|
||||
"Score D (percentile)",
|
||||
"Score D (top 25th percentile)",
|
||||
"Score E (percentile)",
|
||||
"Score E (top 25th percentile)",
|
||||
"Poverty (Less than 200% of federal poverty line) (percentile)",
|
||||
"Percent individuals age 25 or over with less than high school degree (percentile)",
|
||||
"Linguistic isolation (percent) (percentile)",
|
||||
"Unemployed civilians (percent) (percentile)",
|
||||
"Housing burden (percent) (percentile)",
|
||||
]
|
||||
|
||||
# columns to round floats to 2 decimals
|
||||
TILES_SCORE_FLOAT_COLUMNS = [
|
||||
"Score D (percentile)",
|
||||
"Score D (top 25th percentile)",
|
||||
"Score E (percentile)",
|
||||
"Score E (top 25th percentile)",
|
||||
"Poverty (Less than 200% of federal poverty line)",
|
||||
"Percent individuals age 25 or over with less than high school degree",
|
||||
"Linguistic isolation (percent)",
|
||||
"Unemployed civilians (percent)",
|
||||
"Housing burden (percent)",
|
||||
]
|
||||
TILES_ROUND_NUM_DECIMALS = 2
|
||||
|
||||
DOWNLOADABLE_SCORE_INDICATOR_COLUMNS_BASIC = [
|
||||
"Percent individuals age 25 or over with less than high school degree",
|
||||
"Linguistic isolation (percent)",
|
||||
"Poverty (Less than 200% of federal poverty line)",
|
||||
"Unemployed civilians (percent)",
|
||||
"Housing burden (percent)",
|
||||
"Respiratory hazard index",
|
||||
"Diesel particulate matter",
|
||||
"Particulate matter (PM2.5)",
|
||||
"Traffic proximity and volume",
|
||||
"Proximity to RMP sites",
|
||||
"Wastewater discharge",
|
||||
"Percent pre-1960s housing (lead paint indicator)",
|
||||
"Total population",
|
||||
]
|
||||
|
||||
# For every indicator above, we want to include percentile and min-max normalized variants also
|
||||
DOWNLOADABLE_SCORE_INDICATOR_COLUMNS_FULL = list(
|
||||
pd.core.common.flatten(
|
||||
[
|
||||
[p, f"{p} (percentile)", f"{p} (min-max normalized)"]
|
||||
for p in DOWNLOADABLE_SCORE_INDICATOR_COLUMNS_BASIC
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# Finally we augment with the GEOID10, county, and state
|
||||
DOWNLOADABLE_SCORE_COLUMNS = [
|
||||
"GEOID10",
|
||||
"County Name",
|
||||
"State Name",
|
||||
*DOWNLOADABLE_SCORE_INDICATOR_COLUMNS_FULL,
|
||||
]
|
|
@ -6,6 +6,8 @@ import pandas as pd
|
|||
from data_pipeline.etl.base import ExtractTransformLoad
|
||||
from data_pipeline.utils import get_module_logger, get_zip_info
|
||||
|
||||
from . import constants
|
||||
|
||||
## zlib is not available on all systems
|
||||
try:
|
||||
import zlib # noqa # pylint: disable=unused-import
|
||||
|
@ -25,108 +27,19 @@ class PostScoreETL(ExtractTransformLoad):
|
|||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.CENSUS_COUNTIES_ZIP_URL = "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/Gaz_counties_national.zip"
|
||||
self.CENSUS_COUNTIES_TXT = self.TMP_PATH / "Gaz_counties_national.txt"
|
||||
self.CENSUS_COUNTIES_COLS = ["USPS", "GEOID", "NAME"]
|
||||
self.CENSUS_USA_CSV = self.DATA_PATH / "census" / "csv" / "us.csv"
|
||||
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
|
||||
self.DOWNLOADABLE_INFO_PATH = self.DATA_PATH / "score" / "downloadable"
|
||||
self.input_counties_df: pd.DataFrame
|
||||
self.input_states_df: pd.DataFrame
|
||||
self.input_score_df: pd.DataFrame
|
||||
self.input_national_cbg_df: pd.DataFrame
|
||||
|
||||
self.STATE_CSV = (
|
||||
self.DATA_PATH / "census" / "csv" / "fips_states_2010.csv"
|
||||
)
|
||||
|
||||
self.FULL_SCORE_CSV = self.SCORE_CSV_PATH / "full" / "usa.csv"
|
||||
self.FULL_SCORE_CSV_PLUS_COUNTIES = (
|
||||
self.SCORE_CSV_PATH / "full" / "usa_counties.csv"
|
||||
)
|
||||
|
||||
self.TILES_SCORE_COLUMNS = [
|
||||
"GEOID10",
|
||||
"State Name",
|
||||
"County Name",
|
||||
"Total population",
|
||||
"Score D (percentile)",
|
||||
"Score D (top 25th percentile)",
|
||||
"Score E (percentile)",
|
||||
"Score E (top 25th percentile)",
|
||||
"Poverty (Less than 200% of federal poverty line) (percentile)",
|
||||
"Percent individuals age 25 or over with less than high school degree (percentile)",
|
||||
"Linguistic isolation (percent) (percentile)",
|
||||
"Unemployed civilians (percent) (percentile)",
|
||||
"Housing burden (percent) (percentile)",
|
||||
]
|
||||
self.TILES_SCORE_CSV_PATH = self.SCORE_CSV_PATH / "tiles"
|
||||
self.TILES_SCORE_CSV = self.TILES_SCORE_CSV_PATH / "usa.csv"
|
||||
|
||||
# columns to round floats to 2 decimals
|
||||
self.TILES_SCORE_FLOAT_COLUMNS = [
|
||||
"Score D (percentile)",
|
||||
"Score D (top 25th percentile)",
|
||||
"Score E (percentile)",
|
||||
"Score E (top 25th percentile)",
|
||||
"Poverty (Less than 200% of federal poverty line) (percentile)",
|
||||
"Percent individuals age 25 or over with less than high school degree (percentile)",
|
||||
"Linguistic isolation (percent) (percentile)",
|
||||
"Unemployed civilians (percent) (percentile)",
|
||||
"Housing burden (percent) (percentile)",
|
||||
]
|
||||
self.TILES_ROUND_NUM_DECIMALS = 2
|
||||
|
||||
self.DOWNLOADABLE_SCORE_INDICATORS_BASIC = [
|
||||
"Percent individuals age 25 or over with less than high school degree",
|
||||
"Linguistic isolation (percent)",
|
||||
"Poverty (Less than 200% of federal poverty line)",
|
||||
"Unemployed civilians (percent)",
|
||||
"Housing burden (percent)",
|
||||
"Respiratory hazard index",
|
||||
"Diesel particulate matter",
|
||||
"Particulate matter (PM2.5)",
|
||||
"Traffic proximity and volume",
|
||||
"Proximity to RMP sites",
|
||||
"Wastewater discharge",
|
||||
"Percent pre-1960s housing (lead paint indicator)",
|
||||
"Total population",
|
||||
]
|
||||
|
||||
# For every indicator above, we want to include percentile and min-max normalized variants also
|
||||
self.DOWNLOADABLE_SCORE_INDICATORS_FULL = list(
|
||||
pd.core.common.flatten(
|
||||
[
|
||||
[p, f"{p} (percentile)", f"{p} (min-max normalized)"]
|
||||
for p in self.DOWNLOADABLE_SCORE_INDICATORS_BASIC
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# Finally we augment with the GEOID10, county, and state
|
||||
self.DOWNLOADABLE_SCORE_COLUMNS = [
|
||||
"GEOID10",
|
||||
"County Name",
|
||||
"State Name",
|
||||
*self.DOWNLOADABLE_SCORE_INDICATORS_FULL,
|
||||
]
|
||||
self.DOWNLOADABLE_SCORE_CSV = self.DOWNLOADABLE_INFO_PATH / "usa.csv"
|
||||
self.DOWNLOADABLE_SCORE_EXCEL = self.DOWNLOADABLE_INFO_PATH / "usa.xlsx"
|
||||
self.DOWNLOADABLE_SCORE_ZIP = (
|
||||
self.DOWNLOADABLE_INFO_PATH / "Screening Tool Data.zip"
|
||||
)
|
||||
|
||||
self.counties_df: pd.DataFrame
|
||||
self.states_df: pd.DataFrame
|
||||
self.score_df: pd.DataFrame
|
||||
self.score_county_state_merged: pd.DataFrame
|
||||
self.score_for_tiles: pd.DataFrame
|
||||
|
||||
def extract(self) -> None:
|
||||
super().extract(
|
||||
self.CENSUS_COUNTIES_ZIP_URL,
|
||||
self.TMP_PATH,
|
||||
)
|
||||
self.output_score_county_state_merged_df: pd.DataFrame
|
||||
self.output_score_tiles_df: pd.DataFrame
|
||||
self.output_downloadable_df: pd.DataFrame
|
||||
|
||||
def _extract_counties(self, county_path: Path) -> pd.DataFrame:
|
||||
logger.info("Reading Counties CSV")
|
||||
self.counties_df = pd.read_csv(
|
||||
self.CENSUS_COUNTIES_TXT,
|
||||
return pd.read_csv(
|
||||
county_path,
|
||||
sep="\t",
|
||||
dtype={
|
||||
"GEOID": "string",
|
||||
|
@ -136,134 +49,213 @@ class PostScoreETL(ExtractTransformLoad):
|
|||
encoding="latin-1",
|
||||
)
|
||||
|
||||
def _extract_states(self, state_path: Path) -> pd.DataFrame:
|
||||
logger.info("Reading States CSV")
|
||||
self.states_df = pd.read_csv(
|
||||
self.STATE_CSV, dtype={"fips": "string", "state_code": "string"}
|
||||
return pd.read_csv(
|
||||
state_path, dtype={"fips": "string", "state_abbreviation": "string"}
|
||||
)
|
||||
self.score_df = pd.read_csv(
|
||||
self.FULL_SCORE_CSV,
|
||||
|
||||
def _extract_score(self, score_path: Path) -> pd.DataFrame:
|
||||
logger.info("Reading Score CSV")
|
||||
return pd.read_csv(
|
||||
score_path,
|
||||
dtype={"GEOID10": "string", "Total population": "int64"},
|
||||
)
|
||||
|
||||
def transform(self) -> None:
|
||||
logger.info("Transforming data sources for Score + County CSV")
|
||||
|
||||
# rename some of the columns to prepare for merge
|
||||
self.counties_df = self.counties_df[["USPS", "GEOID", "NAME"]]
|
||||
self.counties_df.rename(
|
||||
columns={"USPS": "State Abbreviation", "NAME": "County Name"},
|
||||
inplace=True,
|
||||
)
|
||||
|
||||
# remove unnecessary columns
|
||||
self.states_df.rename(
|
||||
columns={
|
||||
"fips": "State Code",
|
||||
"state_name": "State Name",
|
||||
"state_abbreviation": "State Abbreviation",
|
||||
},
|
||||
inplace=True,
|
||||
)
|
||||
self.states_df.drop(["region", "division"], axis=1, inplace=True)
|
||||
|
||||
# add the tract level column
|
||||
self.score_df["GEOID"] = self.score_df.GEOID10.str[:5]
|
||||
|
||||
# merge state with counties
|
||||
county_state_merged = self.counties_df.merge(
|
||||
self.states_df, on="State Abbreviation", how="left"
|
||||
)
|
||||
|
||||
# merge state + county with score
|
||||
self.score_county_state_merged = self.score_df.merge(
|
||||
county_state_merged, on="GEOID", how="left"
|
||||
)
|
||||
|
||||
# check if there are census cbgs without score
|
||||
logger.info("Removing CBG rows without score")
|
||||
|
||||
## load cbgs
|
||||
cbg_usa_df = pd.read_csv(
|
||||
self.CENSUS_USA_CSV,
|
||||
def _extract_national_cbg(self, national_cbg_path: Path) -> pd.DataFrame:
|
||||
logger.info("Reading national CBG")
|
||||
return pd.read_csv(
|
||||
national_cbg_path,
|
||||
names=["GEOID10"],
|
||||
dtype={"GEOID10": "string"},
|
||||
low_memory=False,
|
||||
header=None,
|
||||
)
|
||||
|
||||
def extract(self) -> None:
|
||||
logger.info("Starting Extraction")
|
||||
super().extract(
|
||||
constants.CENSUS_COUNTIES_ZIP_URL,
|
||||
constants.TMP_PATH,
|
||||
)
|
||||
self.input_counties_df = self._extract_counties(
|
||||
constants.CENSUS_COUNTIES_FILE_NAME
|
||||
)
|
||||
self.input_states_df = self._extract_states(
|
||||
constants.DATA_CENSUS_CSV_STATE_FILE_PATH
|
||||
)
|
||||
self.input_score_df = self._extract_score(
|
||||
constants.DATA_SCORE_CSV_FULL_FILE_PATH
|
||||
)
|
||||
self.input_national_cbg_df = self._extract_national_cbg(
|
||||
constants.DATA_CENSUS_CSV_FILE_PATH
|
||||
)
|
||||
|
||||
def _transform_counties(self, initial_counties_df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Necessary modifications to the counties dataframe
|
||||
"""
|
||||
# Rename some of the columns to prepare for merge
|
||||
new_df = initial_counties_df[constants.CENSUS_COUNTIES_COLUMNS]
|
||||
new_df.rename(
|
||||
columns={"USPS": "State Abbreviation", "NAME": "County Name"}, inplace=True
|
||||
)
|
||||
return new_df
|
||||
|
||||
def _transform_states(self, initial_states_df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Necessary modifications to the states dataframe
|
||||
"""
|
||||
# remove unnecessary columns
|
||||
new_df = initial_states_df.rename(
|
||||
columns={
|
||||
"fips": "State Code",
|
||||
"state_name": "State Name",
|
||||
"state_abbreviation": "State Abbreviation",
|
||||
}
|
||||
)
|
||||
new_df.drop(["region", "division"], axis=1, inplace=True)
|
||||
return new_df
|
||||
|
||||
def _transform_score(self, initial_score_df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Necessary modifications to the score dataframe
|
||||
"""
|
||||
# Add the tract level column
|
||||
new_df = initial_score_df.copy()
|
||||
new_df["GEOID"] = initial_score_df.GEOID10.str[:5]
|
||||
return new_df
|
||||
|
||||
def _create_score_data(
|
||||
self,
|
||||
national_cbg_df: pd.DataFrame,
|
||||
counties_df: pd.DataFrame,
|
||||
states_df: pd.DataFrame,
|
||||
score_df: pd.DataFrame,
|
||||
) -> pd.DataFrame:
|
||||
|
||||
# merge state with counties
|
||||
logger.info("Merging state with county info")
|
||||
county_state_merged = counties_df.merge(
|
||||
states_df, on="State Abbreviation", how="left"
|
||||
)
|
||||
|
||||
# merge state + county with score
|
||||
score_county_state_merged = score_df.merge(
|
||||
county_state_merged, on="GEOID", how="left"
|
||||
)
|
||||
|
||||
# check if there are census cbgs without score
|
||||
logger.info("Removing CBG rows without score")
|
||||
|
||||
# merge census cbgs with score
|
||||
merged_df = cbg_usa_df.merge(
|
||||
self.score_county_state_merged,
|
||||
on="GEOID10",
|
||||
how="left",
|
||||
merged_df = national_cbg_df.merge(
|
||||
score_county_state_merged, on="GEOID10", how="left"
|
||||
)
|
||||
|
||||
# recast population to integer
|
||||
merged_df["Total population"] = (
|
||||
score_county_state_merged["Total population"] = (
|
||||
merged_df["Total population"].fillna(0.0).astype(int)
|
||||
)
|
||||
|
||||
# list the null score cbgs
|
||||
null_cbg_df = merged_df[merged_df["Score E (percentile)"].isnull()]
|
||||
|
||||
# subsctract data sets
|
||||
# subtract data sets
|
||||
# this follows the XOR pattern outlined here:
|
||||
# https://stackoverflow.com/a/37313953
|
||||
removed_df = pd.concat(
|
||||
de_duplicated_df = pd.concat(
|
||||
[merged_df, null_cbg_df, null_cbg_df]
|
||||
).drop_duplicates(keep=False)
|
||||
|
||||
# set the score to the new df
|
||||
self.score_county_state_merged = removed_df
|
||||
|
||||
def _save_full_csv(self):
|
||||
logger.info("Saving Full Score CSV with County Information")
|
||||
self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
|
||||
self.score_county_state_merged.to_csv(
|
||||
self.FULL_SCORE_CSV_PLUS_COUNTIES, index=False
|
||||
)
|
||||
|
||||
def _save_tile_csv(self):
|
||||
logger.info("Saving Tile Score CSV")
|
||||
score_tiles = self.score_county_state_merged[self.TILES_SCORE_COLUMNS]
|
||||
return de_duplicated_df
|
||||
|
||||
def _create_tile_data(
|
||||
self, score_county_state_merged_df: pd.DataFrame
|
||||
) -> pd.DataFrame:
|
||||
score_tiles = score_county_state_merged_df[constants.TILES_SCORE_COLUMNS]
|
||||
decimals = pd.Series(
|
||||
[self.TILES_ROUND_NUM_DECIMALS]
|
||||
* len(self.TILES_SCORE_FLOAT_COLUMNS),
|
||||
index=self.TILES_SCORE_FLOAT_COLUMNS,
|
||||
[constants.TILES_ROUND_NUM_DECIMALS]
|
||||
* len(constants.TILES_SCORE_FLOAT_COLUMNS),
|
||||
index=constants.TILES_SCORE_FLOAT_COLUMNS,
|
||||
)
|
||||
score_tiles = score_tiles.round(decimals)
|
||||
return score_tiles.round(decimals)
|
||||
|
||||
self.TILES_SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
|
||||
score_tiles.to_csv(self.TILES_SCORE_CSV, index=False)
|
||||
def _create_downloadable_data(
|
||||
self, score_county_state_merged_df: pd.DataFrame
|
||||
) -> pd.DataFrame:
|
||||
return score_county_state_merged_df[constants.DOWNLOADABLE_SCORE_COLUMNS]
|
||||
|
||||
def _save_downloadable_zip(self):
|
||||
def transform(self) -> None:
|
||||
logger.info("Transforming data sources for Score + County CSV")
|
||||
|
||||
transformed_counties = self._transform_counties(self.input_counties_df)
|
||||
transformed_states = self._transform_states(self.input_states_df)
|
||||
transformed_score = self._transform_score(self.input_score_df)
|
||||
|
||||
output_score_county_state_merged_df = self._create_score_data(
|
||||
self.input_national_cbg_df,
|
||||
transformed_counties,
|
||||
transformed_states,
|
||||
transformed_score,
|
||||
)
|
||||
self.output_score_tiles_df = self._create_tile_data(
|
||||
output_score_county_state_merged_df
|
||||
)
|
||||
self.output_downloadable_df = self._create_downloadable_data(
|
||||
output_score_county_state_merged_df
|
||||
)
|
||||
self.output_score_county_state_merged_df = output_score_county_state_merged_df
|
||||
|
||||
def _load_score_csv(
|
||||
self, score_county_state_merged: pd.DataFrame, score_csv_path: Path
|
||||
) -> None:
|
||||
logger.info("Saving Full Score CSV with County Information")
|
||||
score_csv_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
score_county_state_merged.to_csv(score_csv_path, index=False)
|
||||
|
||||
def _load_tile_csv(
|
||||
self, score_tiles_df: pd.DataFrame, tile_score_path: Path
|
||||
) -> None:
|
||||
logger.info("Saving Tile Score CSV")
|
||||
# TODO: check which are the columns we'll use
|
||||
# Related to: https://github.com/usds/justice40-tool/issues/302
|
||||
tile_score_path.mkdir(parents=True, exist_ok=True)
|
||||
score_tiles_df.to_csv(tile_score_path, index=False)
|
||||
|
||||
def _load_downloadable_zip(
|
||||
self, downloadable_df: pd.DataFrame, downloadable_info_path: Path
|
||||
) -> None:
|
||||
logger.info("Saving Downloadable CSV")
|
||||
logger.info(list(self.score_county_state_merged.columns))
|
||||
logger.info(self.DOWNLOADABLE_SCORE_COLUMNS)
|
||||
downloadable_tiles = self.score_county_state_merged[
|
||||
self.DOWNLOADABLE_SCORE_COLUMNS
|
||||
]
|
||||
self.DOWNLOADABLE_INFO_PATH.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
downloadable_info_path.mkdir(parents=True, exist_ok=True)
|
||||
csv_path = downloadable_info_path / "usa.csv"
|
||||
excel_path = downloadable_info_path / "usa.xlsx"
|
||||
zip_path = downloadable_info_path / "Screening Tool Data.zip"
|
||||
|
||||
logger.info("Writing downloadable csv")
|
||||
downloadable_tiles.to_csv(self.DOWNLOADABLE_SCORE_CSV, index=False)
|
||||
downloadable_df.to_csv(csv_path, index=False)
|
||||
|
||||
logger.info("Writing downloadable excel")
|
||||
downloadable_tiles.to_excel(self.DOWNLOADABLE_SCORE_EXCEL, index=False)
|
||||
downloadable_df.to_excel(excel_path, index=False)
|
||||
|
||||
logger.info("Compressing files")
|
||||
files_to_compress = [
|
||||
self.DOWNLOADABLE_SCORE_CSV,
|
||||
self.DOWNLOADABLE_SCORE_EXCEL,
|
||||
]
|
||||
with zipfile.ZipFile(self.DOWNLOADABLE_SCORE_ZIP, "w") as zf:
|
||||
files_to_compress = [csv_path, excel_path]
|
||||
with zipfile.ZipFile(zip_path, "w") as zf:
|
||||
for f in files_to_compress:
|
||||
zf.write(f, arcname=Path(f).name, compress_type=compression)
|
||||
zip_info = get_zip_info(self.DOWNLOADABLE_SCORE_ZIP)
|
||||
zip_info = get_zip_info(zip_path)
|
||||
logger.info(json.dumps(zip_info, indent=4, sort_keys=True, default=str))
|
||||
|
||||
def load(self) -> None:
|
||||
self._save_full_csv()
|
||||
self._save_tile_csv()
|
||||
self._save_downloadable_zip()
|
||||
self._load_score_csv(
|
||||
self.output_score_county_state_merged_df,
|
||||
constants.FULL_SCORE_CSV_FULL_PLUS_COUNTIES_FILE_PATH,
|
||||
)
|
||||
self._load_tile_csv(
|
||||
self.output_score_tiles_df, constants.DATA_SCORE_TILES_FILE_PATH
|
||||
)
|
||||
self._load_downloadable_zip(
|
||||
self.output_downloadable_df, constants.SCORE_DOWNLOADABLE_DIR
|
||||
)
|
||||
|
|
118
data/data-pipeline/data_pipeline/etl/score/tests/conftest.py
Normal file
118
data/data-pipeline/data_pipeline/etl/score/tests/conftest.py
Normal file
|
@ -0,0 +1,118 @@
|
|||
import os
|
||||
from importlib import reload
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
import pytest
|
||||
from data_pipeline import config
|
||||
from data_pipeline.etl.score import etl_score_post, tests
|
||||
from data_pipeline.etl.score.etl_score_post import PostScoreETL
|
||||
|
||||
|
||||
def pytest_configure():
|
||||
pytest.SNAPSHOT_DIR = Path(__file__).parent / "snapshots"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def root(tmp_path_factory):
|
||||
basetemp = Path.cwd() / "temp_dir"
|
||||
os.environ["PYTEST_DEBUG_TEMPROOT"] = str(
|
||||
basetemp
|
||||
) # this sets the location of the temp directory inside the project folder
|
||||
basetemp.mkdir(parents=True, exist_ok=True)
|
||||
root = tmp_path_factory.mktemp("root", numbered=False)
|
||||
return root
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def settings_override(monkeypatch, root):
|
||||
reload(config)
|
||||
|
||||
monkeypatch.setattr(config.settings, "APP_ROOT", root)
|
||||
return config.settings
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def etl(monkeypatch, root):
|
||||
reload(etl_score_post)
|
||||
|
||||
tmp_path = root / "tmp"
|
||||
tmp_path.mkdir(parents=True, exist_ok=True)
|
||||
etl = PostScoreETL()
|
||||
monkeypatch.setattr(etl, "DATA_PATH", root)
|
||||
monkeypatch.setattr(etl, "TMP_PATH", tmp_path)
|
||||
return etl
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def sample_data_dir():
|
||||
base_dir = Path(tests.__file__).resolve().parent
|
||||
return base_dir / "sample_data"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def county_data_initial(sample_data_dir):
|
||||
return sample_data_dir / "county_data_initial.csv"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def state_data_initial(sample_data_dir):
|
||||
return sample_data_dir / "state_data_initial.csv"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def score_data_initial(sample_data_dir):
|
||||
return sample_data_dir / "score_data_initial.csv"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def counties_transformed_expected():
|
||||
return pd.DataFrame.from_dict(
|
||||
data={
|
||||
"State Abbreviation": pd.Series(["AL", "AL"], dtype="string"),
|
||||
"GEOID": pd.Series(["01001", "01003"], dtype="string"),
|
||||
"County Name": pd.Series(
|
||||
["AutaugaCounty", "BaldwinCounty"], dtype="object"
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def states_transformed_expected():
|
||||
return pd.DataFrame.from_dict(
|
||||
data={
|
||||
"State Code": pd.Series(["01", "02", "04"], dtype="string"),
|
||||
"State Name": pd.Series(["Alabama", "Alaska", "Arizona"], dtype="object"),
|
||||
"State Abbreviation": pd.Series(["AL", "AK", "AZ"], dtype="string"),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def score_transformed_expected():
|
||||
return pd.read_pickle(pytest.SNAPSHOT_DIR / "score_transformed_expected.pkl")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def national_cbg_df():
|
||||
return pd.DataFrame.from_dict(
|
||||
data={
|
||||
"GEOID10": pd.Series(["010010201001", "010010201002"], dtype="string"),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def score_data_expected():
|
||||
return pd.read_pickle(pytest.SNAPSHOT_DIR / "score_data_expected.pkl")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def tile_data_expected():
|
||||
return pd.read_pickle(pytest.SNAPSHOT_DIR / "tile_data_expected.pkl")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def downloadable_data_expected():
|
||||
return pd.read_pickle(pytest.SNAPSHOT_DIR / "downloadable_data_expected.pkl")
|
|
@ -0,0 +1,3 @@
|
|||
USPS GEOID ANSICODE NAME POP10 HU10 ALAND AWATER ALAND_SQMI AWATER_SQMI INTPTLAT INTPTLONG
|
||||
AL 01001 00161526 AutaugaCounty 54571 22135 1539582278 25775735 594.436 9.952 32.536382 -86.644490
|
||||
AL 01003 00161527 BaldwinCounty 182265 104061 4117521611 1133190229 1589.784 437.527 30.659218 -87.746067
|
|
|
@ -0,0 +1,3 @@
|
|||
GEOID10,Housing burden (percent),Total population,Air toxics cancer risk,Respiratory hazard index,Diesel particulate matter,Particulate matter (PM2.5),Ozone,Traffic proximity and volume,Proximity to RMP sites,Proximity to TSDF sites,Proximity to NPL sites,Wastewater discharge,Percent pre-1960s housing (lead paint indicator),Individuals under 5 years old,Individuals over 64 years old,Linguistic isolation (percent),Percent of households in linguistic isolation,Poverty (Less than 200% of federal poverty line),Percent individuals age 25 or over with less than high school degree,Unemployed civilians (percent),Housing + Transportation Costs % Income for the Regional Typical Household,GEOID10 (percentile),Housing burden (percent) (percentile),Total population (percentile),Air toxics cancer risk (percentile),Respiratory hazard index (percentile),Diesel particulate matter (percentile),Particulate matter (PM2.5) (percentile),Ozone (percentile),Traffic proximity and volume (percentile),Proximity to RMP sites (percentile),Proximity to TSDF sites (percentile),Proximity to NPL sites (percentile),Wastewater discharge (percentile),Percent pre-1960s housing (lead paint indicator) (percentile),Individuals under 5 years old (percentile),Individuals over 64 years old (percentile),Linguistic isolation (percent) (percentile),Percent of households in linguistic isolation (percentile),Poverty (Less than 200% of federal poverty line) (percentile),Percent individuals age 25 or over with less than high school degree (percentile),Unemployed civilians (percent) (percentile),Housing + Transportation Costs % Income for the Regional Typical Household (percentile),Housing burden (percent) (min-max normalized),Total population (min-max normalized),Air toxics cancer risk (min-max normalized),Respiratory hazard index (min-max normalized),Diesel particulate matter (min-max normalized),Particulate matter (PM2.5) (min-max normalized),Ozone (min-max normalized),Traffic proximity and volume (min-max normalized),Proximity to RMP sites (min-max normalized),Proximity to TSDF sites (min-max normalized),Proximity to NPL sites (min-max normalized),Wastewater discharge (min-max normalized),Percent pre-1960s housing (lead paint indicator) (min-max normalized),Individuals under 5 years old (min-max normalized),Individuals over 64 years old (min-max normalized),Linguistic isolation (percent) (min-max normalized),Percent of households in linguistic isolation (min-max normalized),Poverty (Less than 200% of federal poverty line) (min-max normalized),Percent individuals age 25 or over with less than high school degree (min-max normalized),Unemployed civilians (percent) (min-max normalized),Housing + Transportation Costs % Income for the Regional Typical Household (min-max normalized),Score A,Score B,Socioeconomic Factors,Sensitive populations,Environmental effects,Exposures,Pollution Burden,Population Characteristics,Score C,Score D,Score E,Score A (percentile),Score A (top 25th percentile),Score A (top 30th percentile),Score A (top 35th percentile),Score A (top 40th percentile),Score B (percentile),Score B (top 25th percentile),Score B (top 30th percentile),Score B (top 35th percentile),Score B (top 40th percentile),Score C (percentile),Score C (top 25th percentile),Score C (top 30th percentile),Score C (top 35th percentile),Score C (top 40th percentile),Score D (percentile),Score D (top 25th percentile),Score D (top 30th percentile),Score D (top 35th percentile),Score D (top 40th percentile),Score E (percentile),Score E (top 25th percentile),Score E (top 30th percentile),Score E (top 35th percentile),Score E (top 40th percentile),Poverty (Less than 200% of federal poverty line) (top 25th percentile),Poverty (Less than 200% of federal poverty line) (top 30th percentile),Poverty (Less than 200% of federal poverty line) (top 35th percentile),Poverty (Less than 200% of federal poverty line) (top 40th percentile)
|
||||
010010201001,0.15,692,49.3770316066,0.788051737456,0.2786630687,9.99813169399,40.1217287582,91.0159000855,0.0852006888915,0.0655778245369,0.0709415490545,0.0,0.29,0.0491329479769,0.0953757225434,0.0,0.04,0.293352601156,0.195011337868,0.028125,55.0,4.53858477849437e-06,0.15696279879978475,0.12089201345236528,0.9797143208291796,0.9829416396964773,0.34627219635208273,0.9086451463612172,0.28414902233020944,0.3410837232734089,0.13480504509083976,0.13460988594536452,0.5500810137382961,0.18238709002315753,0.5188510118774764,0.4494787435381899,0.25320991408459015,0.2596066814778244,0.7027453899325112,0.46606500161119757,0.7623733167523703,0.3628393561824028,0.5794871072813119,0.10909090909090909,0.013340530536705737,0.028853697167088285,0.18277886087526787,0.045859591901569303,0.5883290826337872,0.3121515260630353,0.0024222132770710053,0.004621252164336263,0.00015416214761450488,0.007893014211979786,0.0,0.29,0.09433526011570838,0.0953757225434,0.0,0.04,0.293352601156,0.195011337868,0.028125,0.2711864406779661,0.6142191591817839,0.3553155211005275,0.5747020343519587,0.3207651130335348,0.3041468093350269,0.640467674807096,0.5283607196497396,0.4477335736927467,0.23656483320764937,0.12511596962298183,0.4015694309647159,0.6357808408182161,False,False,False,True,0.6315486105122701,False,False,False,True,0.5104500914524833,False,False,False,False,0.44267994354000534,False,False,False,False,0.3517176274094212,False,False,False,False,False,False,False,False
|
||||
010010201002,0.15,1153,49.3770316066,0.788051737456,0.2786630687,9.99813169399,40.1217287582,2.61874365577,0.0737963352265,0.0604962870646,0.0643436665275,0.0,0.094623655914,0.0416305290546,0.150043365134,0.0,0.0,0.182133564614,0.039119804401,0.0287878787878787,57.0,9.07716955698874e-06,0.15696279879978475,0.42875102685480615,0.9797143208291796,0.9829416396964773,0.34627219635208273,0.9086451463612172,0.28414902233020944,0.09634507767787849,0.11004706512415299,0.1228504127842856,0.5178479846414291,0.18238709002315753,0.28270163797524656,0.3660890561105236,0.5188963977252613,0.2596066814778244,0.25592171848974055,0.2701365660159849,0.2207635715031339,0.3696173450745396,0.6379947997334159,0.10909090909090909,0.022227791486736582,0.028853697167088285,0.18277886087526787,0.045859591901569303,0.5883290826337872,0.3121515260630353,6.96928300032502e-05,0.004002684465613169,0.00014221633002379553,0.007158928457599425,0.0,0.094623655914,0.07993061578488315,0.150043365134,0.0,0.0,0.182133564614,0.039119804401,0.0287878787878787,0.2824858757062147,0.24545006875955938,0.05963631310728093,0.350886800163363,0.38153071177120307,0.2431668381096544,0.5996779005411742,0.4808408797306676,0.36620875596728303,0.17608814038438173,0.07182643137875756,0.2554173925742535,0.21102603786087423,False,False,False,False,0.2509565067420677,False,False,False,False,0.2850458170133389,False,False,False,False,0.16239056337452856,False,False,False,False,0.11055992520412285,False,False,False,False,False,False,False,False
|
|
|
@ -0,0 +1,4 @@
|
|||
fips,state_name,state_abbreviation,region,division
|
||||
01,Alabama,AL,South,East South Central
|
||||
02,Alaska,AK,West,Pacific
|
||||
04,Arizona,AZ,West,Mountain
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,116 @@
|
|||
# pylint: disable=W0212
|
||||
## Above disables warning about access to underscore-prefixed methods
|
||||
|
||||
from importlib import reload
|
||||
|
||||
import pandas.api.types as ptypes
|
||||
import pandas.testing as pdt
|
||||
from data_pipeline.etl.score import constants
|
||||
|
||||
# See conftest.py for all fixtures used in these tests
|
||||
|
||||
|
||||
# Extract Tests
|
||||
def test_extract_counties(etl, county_data_initial):
|
||||
reload(constants)
|
||||
extracted = etl._extract_counties(county_data_initial)
|
||||
assert all(
|
||||
ptypes.is_string_dtype(extracted[col])
|
||||
for col in constants.CENSUS_COUNTIES_COLUMNS
|
||||
)
|
||||
|
||||
|
||||
def test_extract_states(etl, state_data_initial):
|
||||
extracted = etl._extract_states(state_data_initial)
|
||||
string_cols = ["fips", "state_abbreviation"]
|
||||
assert all(ptypes.is_string_dtype(extracted[col]) for col in string_cols)
|
||||
|
||||
|
||||
def test_extract_score(etl, score_data_initial):
|
||||
extracted = etl._extract_score(score_data_initial)
|
||||
string_cols = ["GEOID10"]
|
||||
assert all(ptypes.is_string_dtype(extracted[col]) for col in string_cols)
|
||||
|
||||
|
||||
# Transform Tests
|
||||
def test_transform_counties(etl, county_data_initial, counties_transformed_expected):
|
||||
extracted_counties = etl._extract_counties(county_data_initial)
|
||||
counties_transformed_actual = etl._transform_counties(extracted_counties)
|
||||
pdt.assert_frame_equal(counties_transformed_actual, counties_transformed_expected)
|
||||
|
||||
|
||||
def test_transform_states(etl, state_data_initial, states_transformed_expected):
|
||||
extracted_states = etl._extract_states(state_data_initial)
|
||||
states_transformed_actual = etl._transform_states(extracted_states)
|
||||
pdt.assert_frame_equal(states_transformed_actual, states_transformed_expected)
|
||||
|
||||
|
||||
def test_transform_score(etl, score_data_initial, score_transformed_expected):
|
||||
extracted_score = etl._extract_score(score_data_initial)
|
||||
score_transformed_actual = etl._transform_score(extracted_score)
|
||||
pdt.assert_frame_equal(
|
||||
score_transformed_actual, score_transformed_expected, check_dtype=False
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def test_create_score_data(
|
||||
etl,
|
||||
national_cbg_df,
|
||||
counties_transformed_expected,
|
||||
states_transformed_expected,
|
||||
score_transformed_expected,
|
||||
score_data_expected,
|
||||
):
|
||||
score_data_actual = etl._create_score_data(
|
||||
national_cbg_df,
|
||||
counties_transformed_expected,
|
||||
states_transformed_expected,
|
||||
score_transformed_expected,
|
||||
)
|
||||
pdt.assert_frame_equal(
|
||||
score_data_actual,
|
||||
score_data_expected,
|
||||
)
|
||||
|
||||
|
||||
def test_create_tile_data(etl, score_data_expected, tile_data_expected):
|
||||
output_tiles_df_actual = etl._create_tile_data(score_data_expected)
|
||||
pdt.assert_frame_equal(
|
||||
output_tiles_df_actual,
|
||||
tile_data_expected,
|
||||
)
|
||||
|
||||
|
||||
def test_create_downloadable_data(etl, score_data_expected, downloadable_data_expected):
|
||||
output_downloadable_df_actual = etl._create_downloadable_data(score_data_expected)
|
||||
pdt.assert_frame_equal(
|
||||
output_downloadable_df_actual,
|
||||
downloadable_data_expected,
|
||||
)
|
||||
|
||||
|
||||
def test_load_score_csv(etl, score_data_expected):
|
||||
reload(constants)
|
||||
etl._load_score_csv(
|
||||
score_data_expected,
|
||||
constants.FULL_SCORE_CSV_FULL_PLUS_COUNTIES_FILE_PATH,
|
||||
)
|
||||
assert constants.FULL_SCORE_CSV_FULL_PLUS_COUNTIES_FILE_PATH.is_file()
|
||||
|
||||
|
||||
def test_load_tile_csv(etl, tile_data_expected):
|
||||
reload(constants)
|
||||
etl._load_score_csv(tile_data_expected, constants.DATA_SCORE_TILES_FILE_PATH)
|
||||
assert constants.DATA_SCORE_TILES_FILE_PATH.is_file()
|
||||
|
||||
|
||||
def test_load_downloadable_zip(etl, downloadable_data_expected):
|
||||
reload(constants)
|
||||
etl._load_downloadable_zip(
|
||||
downloadable_data_expected, constants.SCORE_DOWNLOADABLE_DIR
|
||||
)
|
||||
assert constants.SCORE_DOWNLOADABLE_DIR.is_dir()
|
||||
assert constants.SCORE_DOWNLOADABLE_CSV_FILE_PATH.is_file()
|
||||
assert constants.SCORE_DOWNLOADABLE_EXCEL_FILE_PATH.is_file()
|
||||
assert constants.SCORE_DOWNLOADABLE_ZIP_FILE_PATH.is_file()
|
Loading…
Add table
Add a link
Reference in a new issue