From d0841706d5529070717fce0f72bcd9ea61d68bba Mon Sep 17 00:00:00 2001 From: Jorge Escobar Date: Mon, 11 Apr 2022 14:10:21 -0400 Subject: [PATCH] columns to keep and tests --- .github/workflows/deploy_be_staging.yml | 2 +- data/data-pipeline/data_pipeline/etl/base.py | 30 +++++++++++++++--- .../etl/score/config/datasets.yml | 2 +- .../etl/sources/national_risk_index/etl.py | 31 ++++++++----------- .../data_pipeline/tests/conftest.py | 5 --- .../tests/sources/example/test_etl.py | 3 ++ 6 files changed, 44 insertions(+), 29 deletions(-) diff --git a/.github/workflows/deploy_be_staging.yml b/.github/workflows/deploy_be_staging.yml index 461c3088..641af561 100644 --- a/.github/workflows/deploy_be_staging.yml +++ b/.github/workflows/deploy_be_staging.yml @@ -109,7 +109,7 @@ jobs: # Deploy to S3 for the staging URL message: | ** Map Deployed! ** - Map with Staging Backend: https://screeningtool.geoplatform.gov/en/cejst/?flags=stage_hash=${{env.PR_NUMBER}}/${{env.SHA_NUMBER}} + Map with Staging Backend: https://screeningtool.geoplatform.gov/en?flags=stage_hash=${{env.PR_NUMBER}}/${{env.SHA_NUMBER}} Find tiles here: https://justice40-data.s3.amazonaws.com/data-pipeline-staging/${{env.PR_NUMBER}}/${{env.SHA_NUMBER}}/data/score/tiles repo-token: ${{ secrets.GITHUB_TOKEN }} repo-token-user-login: "github-actions[bot]" diff --git a/data/data-pipeline/data_pipeline/etl/base.py b/data/data-pipeline/data_pipeline/etl/base.py index 348d273e..5c3805c2 100644 --- a/data/data-pipeline/data_pipeline/etl/base.py +++ b/data/data-pipeline/data_pipeline/etl/base.py @@ -71,6 +71,13 @@ class ExtractTransformLoad: # COLUMNS_TO_KEEP is used to identify which columns to keep in the output df. COLUMNS_TO_KEEP: typing.List[str] = None + # INPUT_GEOID_TRACT_FIELD_NAME is the field name that identifies the Census Tract ID + # on the input file + INPUT_GEOID_TRACT_FIELD_NAME: str = None + + # NULL_REPRESENTATION is how nulls are represented on the input field + NULL_REPRESENTATION: str = None + # Thirteen digits in a census block group ID. EXPECTED_CENSUS_BLOCK_GROUPS_CHARACTER_LENGTH: int = 13 # TODO: investigate. Census says there are only 217,740 CBGs in the US. This might @@ -84,10 +91,12 @@ class ExtractTransformLoad: # periods. https://github.com/usds/justice40-tool/issues/964 EXPECTED_MAX_CENSUS_TRACTS: int = 74160 + # We use ourput_df as the final dataframe to use to write to the CSV + # It is used on the "load" base class method output_df: pd.DataFrame = None @classmethod - def yaml_config_load(cls): + def yaml_config_load(cls) -> dict: # check if the class instance has score YAML definitions datasets_config = load_yaml_dict_from_file( cls.DATASET_CONFIG / "datasets.yml", @@ -108,10 +117,24 @@ class ExtractTransformLoad: ) sys.exit() - # set the fields + # set some of the basic fields cls.LAST_UPDATED_YEAR = dataset_config["last_updated_year"] cls.SOURCE_URL = dataset_config["source_url"] cls.INPUT_EXTRACTED_FILE_NAME = dataset_config["extracted_file_name"] + cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[ + "input_geoid_tract_field_name" + ] + cls.NULL_REPRESENTATION = dataset_config["null_representation"] + + # get the columns to write on the CSV + cls.COLUMNS_TO_KEEP = [ + cls.GEOID_TRACT_FIELD_NAME, # always index with geoid tract id + ] + for field in dataset_config["load_fields"]: + cls.COLUMNS_TO_KEEP.append(field["long_name"]) + + # return the config dict + return dataset_config # This is a classmethod so it can be used by `get_data_frame` without # needing to create an instance of the class. This is a use case in `etl_score`. @@ -263,8 +286,7 @@ class ExtractTransformLoad: Data is written in the specified local data folder or remote AWS S3 bucket. - Uses the directory from `self.OUTPUT_DIR` and the file name from - `self._get_output_file_path`. + Uses the directory and the file name from `self._get_output_file_path`. """ logger.info(f"Saving `{self.NAME}` CSV") diff --git a/data/data-pipeline/data_pipeline/etl/score/config/datasets.yml b/data/data-pipeline/data_pipeline/etl/score/config/datasets.yml index 1396c3fc..9279d06f 100644 --- a/data/data-pipeline/data_pipeline/etl/score/config/datasets.yml +++ b/data/data-pipeline/data_pipeline/etl/score/config/datasets.yml @@ -40,7 +40,7 @@ datasets: excel_download: true - short_name: "has_ag_val" df_field_name: "CONTAINS_AGRIVALUE" - long_name: "Expected building loss rate (Natural Hazards Risk Index)" + long_name: "Contains agricultural value" field_type: bool tile_include: true csv_download: true diff --git a/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py b/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py index 936b0a50..fbe7ff93 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py @@ -26,8 +26,15 @@ class NationalRiskIndexETL(ExtractTransformLoad): def __init__(self): # load YAML config - super().yaml_config_load() + self.DATASET_CONFIG = super().yaml_config_load() + # define the full path for the input CSV file + self.INPUT_CSV = self.get_tmp_path() / self.INPUT_EXTRACTED_FILE_NAME + + # this is the main dataframe + self.df: pd.DataFrame + + # Start dataset-specific vars here self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = ( "EAL_SCORE" ) @@ -58,18 +65,6 @@ class NationalRiskIndexETL(ExtractTransformLoad): ) self.CONTAINS_AGRIVALUE = "Contains agricultural value" - self.COLUMNS_TO_KEEP = [ - self.GEOID_TRACT_FIELD_NAME, - self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, - self.EXPECTED_POPULATION_LOSS_RATE_FIELD_NAME, - self.EXPECTED_AGRICULTURE_LOSS_RATE_FIELD_NAME, - self.EXPECTED_BUILDING_LOSS_RATE_FIELD_NAME, - self.CONTAINS_AGRIVALUE, - ] - - self.INPUT_CSV = self.get_tmp_path() / self.INPUT_EXTRACTED_FILE_NAME - self.df: pd.DataFrame - def extract(self) -> None: """Unzips NRI dataset from the FEMA data source and writes the files to the temporary data folder for use in the transform() method @@ -90,19 +85,18 @@ class NationalRiskIndexETL(ExtractTransformLoad): """ logger.info("Transforming National Risk Index Data") - NRI_TRACT_COL = "TRACTFIPS" # Census Tract Column in NRI data - # read in the unzipped csv from NRI data source then rename the # Census Tract column for merging df_nri: pd.DataFrame = pd.read_csv( self.INPUT_CSV, - dtype={NRI_TRACT_COL: "string"}, - na_values=["None"], + dtype={self.INPUT_GEOID_TRACT_FIELD_NAME: "string"}, + na_values=[self.NULL_REPRESENTATION], low_memory=False, ) + df_nri.rename( columns={ - NRI_TRACT_COL: self.GEOID_TRACT_FIELD_NAME, + self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME, self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME: self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, }, inplace=True, @@ -185,6 +179,7 @@ class NationalRiskIndexETL(ExtractTransformLoad): # Note: `round` is smart enough to only apply to float columns. df_nri = df_nri.round(10) + # Assign the final df to the class' output_df for the load method self.output_df = df_nri def load(self) -> None: diff --git a/data/data-pipeline/data_pipeline/tests/conftest.py b/data/data-pipeline/data_pipeline/tests/conftest.py index 5535a97e..f1dc63ac 100644 --- a/data/data-pipeline/data_pipeline/tests/conftest.py +++ b/data/data-pipeline/data_pipeline/tests/conftest.py @@ -52,8 +52,3 @@ def mock_etl(monkeypatch, mock_paths) -> None: data_path, tmp_path = mock_paths monkeypatch.setattr(ExtractTransformLoad, "DATA_PATH", data_path) monkeypatch.setattr(ExtractTransformLoad, "TMP_PATH", tmp_path) - monkeypatch.setattr( - ExtractTransformLoad, - "CONTENT_CONFIG", - Path.cwd() / "data_pipeline" / "score" / "config", - ) diff --git a/data/data-pipeline/data_pipeline/tests/sources/example/test_etl.py b/data/data-pipeline/data_pipeline/tests/sources/example/test_etl.py index 3dc7f8b2..d89275b3 100644 --- a/data/data-pipeline/data_pipeline/tests/sources/example/test_etl.py +++ b/data/data-pipeline/data_pipeline/tests/sources/example/test_etl.py @@ -119,6 +119,7 @@ class TestETL: """ # Setup etl = self._get_instance_of_etl_class() + etl.__init__() data_path, tmp_path = mock_paths assert etl.DATA_PATH == data_path @@ -256,6 +257,7 @@ class TestETL: etl = self._setup_etl_instance_and_run_extract( mock_etl=mock_etl, mock_paths=mock_paths ) + etl.__init__() etl.transform() assert etl.output_df is not None @@ -273,6 +275,7 @@ class TestETL: """ # setup - input variables etl = self._get_instance_of_etl_class() + etl.__init__() # setup - mock transform step df_transform = pd.read_csv(