mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-07-25 04:00:17 -07:00
NRI dataset and initial score YAML configuration (#1534)
* update be staging gha * NRI dataset and initial score YAML configuration * checkpoint * adding data checks for release branch * passing tests * adding INPUT_EXTRACTED_FILE_NAME to base class * lint * columns to keep and tests * update be staging gha * checkpoint * update be staging gha * NRI dataset and initial score YAML configuration * checkpoint * adding data checks for release branch * passing tests * adding INPUT_EXTRACTED_FILE_NAME to base class * lint * columns to keep and tests * checkpoint * PR Review * renoving source url * tests * stop execution of ETL if there's a YAML schema issue * update be staging gha * adding source url as class var again * clean up * force cache bust * gha cache bust * dynamically set score vars from YAML * docsctrings * removing last updated year - optional reverse percentile * passing tests * sort order * column ordening * PR review * class level vars * Updating DatasetsConfig * fix pylint errors * moving metadata hint back to code Co-authored-by: lucasmbrown-usds <lucas.m.brown@omb.eop.gov>
This commit is contained in:
parent
1833e3e794
commit
1c448a77f9
15 changed files with 272 additions and 3485 deletions
|
@ -1,12 +1,15 @@
|
|||
import enum
|
||||
import pathlib
|
||||
import sys
|
||||
import typing
|
||||
from typing import Optional
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from data_pipeline.config import settings
|
||||
from data_pipeline.etl.score.schemas.datasets import DatasetsConfig
|
||||
from data_pipeline.utils import (
|
||||
load_yaml_dict_from_file,
|
||||
unzip_file_from_url,
|
||||
remove_all_from_dir,
|
||||
get_module_logger,
|
||||
|
@ -30,6 +33,9 @@ class ExtractTransformLoad:
|
|||
Attributes:
|
||||
DATA_PATH (pathlib.Path): Local path where all data will be stored
|
||||
TMP_PATH (pathlib.Path): Local path where temporary data will be stored
|
||||
|
||||
TODO: Fill missing attrs here
|
||||
|
||||
GEOID_FIELD_NAME (str): The common column name for a Census Block Group identifier
|
||||
GEOID_TRACT_FIELD_NAME (str): The common column name for a Census Tract identifier
|
||||
"""
|
||||
|
@ -40,6 +46,7 @@ class ExtractTransformLoad:
|
|||
DATA_PATH: pathlib.Path = APP_ROOT / "data"
|
||||
TMP_PATH: pathlib.Path = DATA_PATH / "tmp"
|
||||
CONTENT_CONFIG: pathlib.Path = APP_ROOT / "content" / "config"
|
||||
DATASET_CONFIG: pathlib.Path = APP_ROOT / "etl" / "score" / "config"
|
||||
|
||||
# Parameters
|
||||
GEOID_FIELD_NAME: str = "GEOID10"
|
||||
|
@ -55,6 +62,9 @@ class ExtractTransformLoad:
|
|||
# SOURCE_URL is used to extract source data in extract().
|
||||
SOURCE_URL: str = None
|
||||
|
||||
# INPUT_EXTRACTED_FILE_NAME is the name of the file after extract().
|
||||
INPUT_EXTRACTED_FILE_NAME: str = None
|
||||
|
||||
# GEO_LEVEL is used to identify whether output data is at the unit of the tract or
|
||||
# census block group.
|
||||
# TODO: add tests that enforce seeing the expected geographic identifier field
|
||||
|
@ -64,6 +74,13 @@ class ExtractTransformLoad:
|
|||
# COLUMNS_TO_KEEP is used to identify which columns to keep in the output df.
|
||||
COLUMNS_TO_KEEP: typing.List[str] = None
|
||||
|
||||
# INPUT_GEOID_TRACT_FIELD_NAME is the field name that identifies the Census Tract ID
|
||||
# on the input file
|
||||
INPUT_GEOID_TRACT_FIELD_NAME: str = None
|
||||
|
||||
# NULL_REPRESENTATION is how nulls are represented on the input field
|
||||
NULL_REPRESENTATION: str = None
|
||||
|
||||
# Thirteen digits in a census block group ID.
|
||||
EXPECTED_CENSUS_BLOCK_GROUPS_CHARACTER_LENGTH: int = 13
|
||||
# TODO: investigate. Census says there are only 217,740 CBGs in the US. This might
|
||||
|
@ -77,8 +94,53 @@ class ExtractTransformLoad:
|
|||
# periods. https://github.com/usds/justice40-tool/issues/964
|
||||
EXPECTED_MAX_CENSUS_TRACTS: int = 74160
|
||||
|
||||
# We use output_df as the final dataframe to use to write to the CSV
|
||||
# It is used on the "load" base class method
|
||||
output_df: pd.DataFrame = None
|
||||
|
||||
@classmethod
|
||||
def yaml_config_load(cls) -> dict:
|
||||
"""Generate config dictionary and set instance variables from YAML dataset."""
|
||||
|
||||
# check if the class instance has score YAML definitions
|
||||
datasets_config = load_yaml_dict_from_file(
|
||||
cls.DATASET_CONFIG / "datasets.yml",
|
||||
DatasetsConfig,
|
||||
)
|
||||
|
||||
# get the config for this dataset
|
||||
try:
|
||||
dataset_config = next(
|
||||
item
|
||||
for item in datasets_config.get("datasets")
|
||||
if item["module_name"] == cls.NAME
|
||||
)
|
||||
except StopIteration:
|
||||
# Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope.
|
||||
logger.error(
|
||||
f"Exception encountered while extracting dataset config for dataset {cls.NAME}"
|
||||
)
|
||||
sys.exit()
|
||||
|
||||
# set some of the basic fields
|
||||
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
|
||||
"input_geoid_tract_field_name"
|
||||
]
|
||||
|
||||
# get the columns to write on the CSV
|
||||
# and set the constants
|
||||
cls.COLUMNS_TO_KEEP = [
|
||||
cls.GEOID_TRACT_FIELD_NAME, # always index with geoid tract id
|
||||
]
|
||||
for field in dataset_config["load_fields"]:
|
||||
cls.COLUMNS_TO_KEEP.append(field["long_name"])
|
||||
|
||||
# set the constants for the class
|
||||
setattr(cls, field["df_field_name"], field["long_name"])
|
||||
|
||||
# return the config dict
|
||||
return dataset_config
|
||||
|
||||
# This is a classmethod so it can be used by `get_data_frame` without
|
||||
# needing to create an instance of the class. This is a use case in `etl_score`.
|
||||
@classmethod
|
||||
|
@ -87,16 +149,10 @@ class ExtractTransformLoad:
|
|||
if cls.NAME is None:
|
||||
raise NotImplementedError(
|
||||
f"Child ETL class needs to specify `cls.NAME` (currently "
|
||||
f"{cls.NAME}) and `cls.LAST_UPDATED_YEAR` (currently "
|
||||
f"{cls.LAST_UPDATED_YEAR})."
|
||||
f"{cls.NAME})."
|
||||
)
|
||||
|
||||
output_file_path = (
|
||||
cls.DATA_PATH
|
||||
/ "dataset"
|
||||
/ f"{cls.NAME}_{cls.LAST_UPDATED_YEAR}"
|
||||
/ "usa.csv"
|
||||
)
|
||||
output_file_path = cls.DATA_PATH / "dataset" / f"{cls.NAME}" / "usa.csv"
|
||||
return output_file_path
|
||||
|
||||
def get_tmp_path(self) -> pathlib.Path:
|
||||
|
@ -229,8 +285,7 @@ class ExtractTransformLoad:
|
|||
|
||||
Data is written in the specified local data folder or remote AWS S3 bucket.
|
||||
|
||||
Uses the directory from `self.OUTPUT_DIR` and the file name from
|
||||
`self._get_output_file_path`.
|
||||
Uses the directory and the file name from `self._get_output_file_path`.
|
||||
"""
|
||||
logger.info(f"Saving `{self.NAME}` CSV")
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue