YAML Config for Downloadable Assets (#1252)

* starting yaml config load work

* working version for downloadable file

* yaml file update

* checkpoint

* sort if needed

* refactoring

* moving config

* checkpoint

* old files

* skipping downloadble tests for now

* more modularization

* more refactor, new excel yml

* pylint

* completed tabs

* Update excel.yml

* remvoing obsolete tests

* addressing PR feedback

* addressing changes

* confirmed change in yaml breaks tests

* safety bump

* PR review

* adding tests back

* pylint

* Incorporating latest score fields from Emma

* incorporating newest fields from Emma

* passing tests

* adding shapefile aws sync

* missing test

* passing tests
This commit is contained in:
Jorge Escobar 2022-03-04 15:02:09 -05:00 committed by GitHub
commit 6425beb9f4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 725 additions and 170 deletions

View file

@ -39,6 +39,7 @@ class ExtractTransformLoad:
# Directories
DATA_PATH: pathlib.Path = APP_ROOT / "data"
TMP_PATH: pathlib.Path = DATA_PATH / "tmp"
CONTENT_CONFIG: pathlib.Path = APP_ROOT / "content" / "config"
# Parameters
GEOID_FIELD_NAME: str = "GEOID10"

View file

@ -272,104 +272,3 @@ TILES_SCORE_FLOAT_COLUMNS = [
field_names.SCORE_M + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.COLLEGE_ATTENDANCE_FIELD,
]
# Finally we augment with the GEOID10, county, and state
DOWNLOADABLE_SCORE_COLUMNS = [
field_names.GEOID_TRACT_FIELD,
field_names.COUNTY_FIELD,
field_names.STATE_FIELD,
field_names.THRESHOLD_COUNT,
field_names.SCORE_M_COMMUNITIES,
field_names.TOTAL_POP_FIELD,
field_names.FPL_200_AND_COLLEGE_ATTENDANCE_SERIES,
field_names.COLLEGE_ATTENDANCE_FIELD,
field_names.COLLEGE_ATTENDANCE_LESS_THAN_20_FIELD,
field_names.EXPECTED_AGRICULTURE_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
field_names.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD,
field_names.EXPECTED_BUILDING_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.EXPECTED_BUILDING_LOSS_RATE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
field_names.EXPECTED_BUILDING_LOSS_RATE_FIELD,
field_names.EXPECTED_POPULATION_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.EXPECTED_POPULATION_LOSS_RATE_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
field_names.EXPECTED_POPULATION_LOSS_RATE_FIELD,
field_names.ENERGY_BURDEN_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.ENERGY_BURDEN_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.ENERGY_BURDEN_FIELD,
field_names.PM25_EXPOSURE_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.PM25_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.PM25_FIELD,
field_names.DIESEL_PARTICULATE_MATTER_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.DIESEL_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.DIESEL_FIELD,
field_names.TRAFFIC_PROXIMITY_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.TRAFFIC_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.TRAFFIC_FIELD,
field_names.HOUSING_BURDEN_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.HOUSING_BURDEN_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.HOUSING_BURDEN_FIELD,
field_names.LEAD_PAINT_MEDIAN_HOUSE_VALUE_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.LEAD_PAINT_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.LEAD_PAINT_FIELD,
field_names.MEDIAN_HOUSE_VALUE_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.MEDIAN_HOUSE_VALUE_FIELD,
field_names.HAZARDOUS_WASTE_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.TSDF_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.TSDF_FIELD,
field_names.SUPERFUND_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.NPL_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.NPL_FIELD,
field_names.RMP_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.RMP_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.RMP_FIELD,
field_names.WASTEWATER_DISCHARGE_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.WASTEWATER_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.WASTEWATER_FIELD,
field_names.ASTHMA_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.ASTHMA_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.ASTHMA_FIELD,
field_names.DIABETES_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.DIABETES_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.DIABETES_FIELD,
field_names.HEART_DISEASE_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.HEART_DISEASE_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.HEART_DISEASE_FIELD,
field_names.LOW_LIFE_EXPECTANCY_LOW_INCOME_LOW_HIGHER_ED_FIELD,
field_names.LOW_LIFE_EXPECTANCY_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.LIFE_EXPECTANCY_FIELD,
field_names.LOW_MEDIAN_INCOME_LOW_HS_LOW_HIGHER_ED_FIELD,
field_names.LOW_MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD,
field_names.LINGUISTIC_ISOLATION_LOW_HS_LOW_HIGHER_ED_FIELD,
field_names.LINGUISTIC_ISO_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.LINGUISTIC_ISO_FIELD,
field_names.UNEMPLOYMENT_LOW_HS_LOW_HIGHER_ED_FIELD,
field_names.UNEMPLOYMENT_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.UNEMPLOYMENT_FIELD,
field_names.POVERTY_LOW_HS_LOW_HIGHER_ED_FIELD,
field_names.POVERTY_LESS_THAN_200_FPL_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
field_names.POVERTY_LESS_THAN_100_FPL_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
field_names.POVERTY_LESS_THAN_200_FPL_FIELD,
field_names.POVERTY_LESS_THAN_100_FPL_FIELD,
field_names.HIGH_SCHOOL_ED_FIELD + field_names.PERCENTILE_FIELD_SUFFIX,
field_names.HIGH_SCHOOL_ED_FIELD,
field_names.COMBINED_UNEMPLOYMENT_2010,
field_names.COMBINED_POVERTY_LESS_THAN_100_FPL_FIELD_2010,
field_names.ISLAND_AREAS_UNEMPLOYMENT_LOW_HS_EDUCATION_FIELD,
field_names.ISLAND_AREAS_POVERTY_LOW_HS_EDUCATION_FIELD,
field_names.ISLAND_AREAS_LOW_MEDIAN_INCOME_LOW_HS_EDUCATION_FIELD,
field_names.LOW_CENSUS_DECENNIAL_AREA_MEDIAN_INCOME_PERCENT_FIELD_2009
+ field_names.PERCENTILE_FIELD_SUFFIX,
field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_100_FPL_FIELD_2009
+ field_names.ISLAND_AREAS_PERCENTILE_ADJUSTMENT_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
field_names.CENSUS_DECENNIAL_UNEMPLOYMENT_FIELD_2009
+ field_names.ISLAND_AREAS_PERCENTILE_ADJUSTMENT_FIELD
+ field_names.PERCENTILE_FIELD_SUFFIX,
]

View file

@ -6,7 +6,13 @@ import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.score.etl_utils import floor_series
from data_pipeline.utils import get_module_logger, zip_files
from data_pipeline.utils import (
get_module_logger,
zip_files,
load_yaml_dict_from_file,
column_list_from_yaml_object_fields,
load_dict_from_yaml_object_fields,
)
from data_pipeline.score import field_names
@ -40,6 +46,23 @@ class PostScoreETL(ExtractTransformLoad):
self.output_score_tiles_df: pd.DataFrame
self.output_downloadable_df: pd.DataFrame
# Define some constants for the YAML file
# TODO: Implement this as a marshmallow schema.
# TODO: Ticket: https://github.com/usds/justice40-tool/issues/1327
self.yaml_fields_type_percentage_label = "percentage"
self.yaml_fields_type_loss_rate_percentage_label = (
"loss_rate_percentage"
)
self.yaml_fields_type_float_label = "float"
self.yaml_fields_type_string_label = "string"
self.yaml_fields_type_boolean_label = "bool"
self.yaml_fields_type_integer_label = "int64"
self.yaml_excel_sheet_label = "label"
self.yaml_global_config_rounding_num = "rounding_num"
self.yaml_global_config_rounding_num_float = "float"
self.yaml_global_config_sort_by_label = "sort_by_label"
# End YAML definition constants
def _extract_counties(self, county_path: Path) -> pd.DataFrame:
logger.info("Reading Counties CSV")
return pd.read_csv(
@ -300,18 +323,27 @@ class PostScoreETL(ExtractTransformLoad):
return score_tiles
def _create_downloadable_data(
self, score_county_state_merged_df: pd.DataFrame
self, score_df: pd.DataFrame, fields_object: dict, config_object: dict
) -> pd.DataFrame:
df = score_county_state_merged_df[
constants.DOWNLOADABLE_SCORE_COLUMNS
df = score_df[
column_list_from_yaml_object_fields(
yaml_object=fields_object,
target_field="score_name",
)
].copy(deep=True)
df_of_float_columns = df.select_dtypes(include=["float64"])
column_type_dict = load_dict_from_yaml_object_fields(
yaml_object=fields_object,
object_key="score_name",
object_value="format",
)
for column in df_of_float_columns.columns:
# TODO: create a schema for fields to make it more explicit and safe which
# fields are percentages.
if any(x in column for x in constants.PERCENT_PREFIXES_SUFFIXES):
for column in df.columns:
if (
column_type_dict[column]
== self.yaml_fields_type_percentage_label
):
# Convert percentages from fractions between 0 and 1 to an integer
# from 0 to 100.
df_100 = df[column] * 100
@ -320,26 +352,67 @@ class PostScoreETL(ExtractTransformLoad):
).astype("Int64")
df[column] = df_int
elif column in constants.FEMA_ROUND_NUM_COLUMNS:
elif (
column_type_dict[column]
== self.yaml_fields_type_loss_rate_percentage_label
):
# Convert loss rates by multiplying by 100 (they are percents)
# and then rounding appropriately.
df_100 = df[column] * 100
df[column] = floor_series(
series=df_100.astype(float64),
number_of_decimals=constants.TILES_FEMA_ROUND_NUM_DECIMALS,
number_of_decimals=config_object[
self.yaml_global_config_rounding_num
][self.yaml_fields_type_loss_rate_percentage_label],
)
else:
# Round all other floats.
elif column_type_dict[column] == self.yaml_fields_type_float_label:
# Round the floats.
df[column] = floor_series(
series=df[column].astype(float64),
number_of_decimals=constants.TILES_ROUND_NUM_DECIMALS,
number_of_decimals=config_object[
self.yaml_global_config_rounding_num
][self.yaml_global_config_rounding_num_float],
)
# sort by tract id
df_sorted = df.sort_values(self.GEOID_TRACT_FIELD_NAME)
elif column_type_dict[column] == self.yaml_fields_type_string_label:
pass
return df_sorted
elif (
column_type_dict[column] == self.yaml_fields_type_boolean_label
):
pass
elif (
column_type_dict[column] == self.yaml_fields_type_integer_label
):
pass
else:
raise ValueError(
f"Unrecognized type: `{column_type_dict[column]}`"
)
# rename fields
column_rename_dict = load_dict_from_yaml_object_fields(
yaml_object=fields_object,
object_key="score_name",
object_value="label",
)
renamed_df = df.rename(
columns=column_rename_dict,
inplace=False,
)
# sort if needed
if config_object.get(self.yaml_global_config_sort_by_label):
final_df = renamed_df.sort_values(
config_object[self.yaml_global_config_sort_by_label]
)
else:
final_df = renamed_df
return final_df
def transform(self) -> None:
logger.info("Transforming data sources for Score + County CSVs")
@ -358,14 +431,11 @@ class PostScoreETL(ExtractTransformLoad):
self.output_score_tiles_df = self._create_tile_data(
output_score_county_state_merged_df
)
self.output_downloadable_df = self._create_downloadable_data(
output_score_county_state_merged_df
)
self.output_score_county_state_merged_df = (
output_score_county_state_merged_df
)
def _load_score_csv(
def _load_score_csv_full(
self, score_county_state_merged: pd.DataFrame, score_csv_path: Path
) -> None:
logger.info("Saving Full Score CSV with County Information")
@ -379,8 +449,16 @@ class PostScoreETL(ExtractTransformLoad):
def _load_excel_from_df(
self, excel_df: pd.DataFrame, excel_path: Path
) -> None:
# open excel yaml config
excel_csv_config = load_yaml_dict_from_file(
self.CONTENT_CONFIG / "excel.yml"
)
# Define Excel Columns Column Width
num_excel_cols_width = 30
num_excel_cols_width = excel_csv_config["global_config"][
"excel_config"
]["default_column_width"]
# Create a Pandas Excel writer using XlsxWriter as the engine.
with pd.ExcelWriter( # pylint: disable=abstract-class-instantiated
@ -389,25 +467,35 @@ class PostScoreETL(ExtractTransformLoad):
engine="xlsxwriter",
) as writer:
# Convert the dataframe to an XlsxWriter Excel object. We also turn off the
# index column at the left of the output dataframe.
excel_df.to_excel(writer, sheet_name="Data", index=False)
for sheet in excel_csv_config["sheets"]:
excel_df = self._create_downloadable_data(
score_df=self.output_score_county_state_merged_df,
fields_object=sheet["fields"],
config_object=excel_csv_config["global_config"],
)
# Convert the dataframe to an XlsxWriter Excel object. We also turn off the
# index column at the left of the output dataframe.
excel_df.to_excel(
writer,
sheet_name=sheet[self.yaml_excel_sheet_label],
index=False,
)
# Get the xlsxwriter workbook and worksheet objects.
workbook = writer.book
worksheet = writer.sheets["Data"]
# Get the xlsxwriter workbook and worksheet objects.
workbook = writer.book
worksheet = writer.sheets[sheet[self.yaml_excel_sheet_label]]
# set header format
header_format = workbook.add_format(
{"bold": True, "text_wrap": True, "valign": "bottom"}
)
# set header format
header_format = workbook.add_format(
{"bold": True, "text_wrap": True, "valign": "bottom"}
)
# write headers
for col_num, value in enumerate(excel_df.columns.array):
worksheet.write(0, col_num, value, header_format)
# write headers
for col_num, value in enumerate(excel_df.columns.array):
worksheet.write(0, col_num, value, header_format)
num_cols = len(excel_df.columns)
worksheet.set_column(0, num_cols - 1, num_excel_cols_width)
num_cols = len(excel_df.columns)
worksheet.set_column(0, num_cols - 1, num_excel_cols_width)
writer.save()
@ -418,33 +506,33 @@ class PostScoreETL(ExtractTransformLoad):
tile_score_path.parent.mkdir(parents=True, exist_ok=True)
score_tiles_df.to_csv(tile_score_path, index=False, encoding="utf-8")
def _load_downloadable_zip(
self, downloadable_df: pd.DataFrame, downloadable_info_path: Path
) -> None:
def _load_downloadable_zip(self, downloadable_info_path: Path) -> None:
logger.info("Saving Downloadable CSV")
downloadable_info_path.mkdir(parents=True, exist_ok=True)
csv_path = constants.SCORE_DOWNLOADABLE_CSV_FILE_PATH
excel_path = constants.SCORE_DOWNLOADABLE_EXCEL_FILE_PATH
zip_path = constants.SCORE_DOWNLOADABLE_ZIP_FILE_PATH
# TODO: reinstate when PDF is added back
# pdf_path = constants.SCORE_DOWNLOADABLE_PDF_FILE_PATH
# Rename score column
downloadable_df_copy = downloadable_df.rename(
columns={
DISADVANTAGED_COMMUNITIES_FIELD: "Identified as disadvantaged (v0.1)"
},
inplace=False,
)
logger.info("Writing downloadable excel")
self._load_excel_from_df(downloadable_df_copy, excel_path)
self._load_excel_from_df(
excel_df=self.output_score_county_state_merged_df,
excel_path=excel_path,
)
logger.info("Writing downloadable csv")
downloadable_df_copy[self.GEOID_TRACT_FIELD_NAME] = (
'"' + downloadable_df_copy[self.GEOID_TRACT_FIELD_NAME] + '"'
# open yaml config
downloadable_csv_config = load_yaml_dict_from_file(
self.CONTENT_CONFIG / "csv.yml"
)
downloadable_df_copy.to_csv(csv_path, index=False)
downloadable_df = self._create_downloadable_data(
score_df=self.output_score_county_state_merged_df,
fields_object=downloadable_csv_config["fields"],
config_object=downloadable_csv_config["global_config"],
)
downloadable_df.to_csv(csv_path, index=False)
logger.info("Compressing files")
files_to_compress = [
@ -454,13 +542,11 @@ class PostScoreETL(ExtractTransformLoad):
zip_files(zip_path, files_to_compress)
def load(self) -> None:
self._load_score_csv(
self._load_score_csv_full(
self.output_score_county_state_merged_df,
constants.FULL_SCORE_CSV_FULL_PLUS_COUNTIES_FILE_PATH,
)
self._load_tile_csv(
self.output_score_tiles_df, constants.DATA_SCORE_CSV_TILES_FILE_PATH
)
self._load_downloadable_zip(
self.output_downloadable_df, constants.SCORE_DOWNLOADABLE_DIR
)
self._load_downloadable_zip(constants.SCORE_DOWNLOADABLE_DIR)

View file

@ -41,6 +41,11 @@ def etl(monkeypatch, root):
etl = PostScoreETL()
monkeypatch.setattr(etl, "DATA_PATH", root)
monkeypatch.setattr(etl, "TMP_PATH", tmp_path)
monkeypatch.setattr(
etl,
"CONTENT_CONFIG",
Path.cwd() / "data_pipeline" / "content" / "config",
)
return etl

View file

@ -7,6 +7,7 @@ import pandas.api.types as ptypes
import pandas.testing as pdt
from data_pipeline.etl.score import constants
from data_pipeline.utils import load_yaml_dict_from_file
# See conftest.py for all fixtures used in these tests
@ -92,8 +93,13 @@ def test_create_tile_data(etl, score_data_expected, tile_data_expected):
def test_create_downloadable_data(
etl, score_data_expected, downloadable_data_expected
):
downloadable_csv_config = load_yaml_dict_from_file(
etl.CONTENT_CONFIG / "csv.yml"
)
output_downloadable_df_actual = etl._create_downloadable_data(
score_data_expected
score_data_expected,
fields_object=downloadable_csv_config["fields"],
config_object=downloadable_csv_config["global_config"],
)
pdt.assert_frame_equal(
output_downloadable_df_actual,
@ -101,9 +107,9 @@ def test_create_downloadable_data(
)
def test_load_score_csv(etl, score_data_expected):
def test_load_score_csv_full(etl, score_data_expected):
reload(constants)
etl._load_score_csv(
etl._load_score_csv_full(
score_data_expected,
constants.FULL_SCORE_CSV_FULL_PLUS_COUNTIES_FILE_PATH,
)
@ -112,26 +118,25 @@ def test_load_score_csv(etl, score_data_expected):
def test_load_tile_csv(etl, tile_data_expected):
reload(constants)
etl._load_score_csv(
etl._load_score_csv_full(
tile_data_expected, constants.DATA_SCORE_CSV_TILES_FILE_PATH
)
assert constants.DATA_SCORE_CSV_TILES_FILE_PATH.is_file()
def test_load_downloadable_zip(etl, monkeypatch, downloadable_data_expected):
def test_load_downloadable_zip(etl, monkeypatch, score_data_expected):
reload(constants)
STATIC_FILES_PATH = (
static_files_path = (
Path.cwd() / "data_pipeline" / "files"
) # need to monkeypatch to real dir
monkeypatch.setattr(constants, "FILES_PATH", STATIC_FILES_PATH)
monkeypatch.setattr(constants, "FILES_PATH", static_files_path)
monkeypatch.setattr(
constants,
"SCORE_DOWNLOADABLE_PDF_FILE_PATH",
STATIC_FILES_PATH / constants.SCORE_DOWNLOADABLE_PDF_FILE_NAME,
)
etl._load_downloadable_zip(
downloadable_data_expected, constants.SCORE_DOWNLOADABLE_DIR
static_files_path / constants.SCORE_DOWNLOADABLE_PDF_FILE_NAME,
)
etl.output_score_county_state_merged_df = score_data_expected
etl._load_downloadable_zip(constants.SCORE_DOWNLOADABLE_DIR)
assert constants.SCORE_DOWNLOADABLE_DIR.is_dir()
assert constants.SCORE_DOWNLOADABLE_CSV_FILE_PATH.is_file()
assert constants.SCORE_DOWNLOADABLE_EXCEL_FILE_PATH.is_file()