diff --git a/data/data-pipeline/data_pipeline/application.py b/data/data-pipeline/data_pipeline/application.py index 4cd6e8da..ae21c6c2 100644 --- a/data/data-pipeline/data_pipeline/application.py +++ b/data/data-pipeline/data_pipeline/application.py @@ -3,7 +3,12 @@ import sys import click from data_pipeline.config import settings -from data_pipeline.etl.runner import etl_runner, score_generate, score_geo, score_post +from data_pipeline.etl.runner import ( + etl_runner, + score_generate, + score_geo, + score_post, +) from data_pipeline.etl.sources.census.etl_utils import ( reset_data_directories as census_reset, ) diff --git a/data/data-pipeline/data_pipeline/etl/base.py b/data/data-pipeline/data_pipeline/etl/base.py index ca32bb65..4936ab1f 100644 --- a/data/data-pipeline/data_pipeline/etl/base.py +++ b/data/data-pipeline/data_pipeline/etl/base.py @@ -1,8 +1,17 @@ from pathlib import Path from typing import Optional +import pandas as pd +import yaml + from data_pipeline.config import settings -from data_pipeline.utils import unzip_file_from_url, remove_all_from_dir +from data_pipeline.utils import ( + unzip_file_from_url, + remove_all_from_dir, + get_module_logger, +) + +logger = get_module_logger(__name__) class ExtractTransformLoad: @@ -17,7 +26,8 @@ class ExtractTransformLoad: GEOID_TRACT_FIELD_NAME (str): The common column name for a Census Tract identifier """ - DATA_PATH: Path = settings.APP_ROOT / "data" + APP_ROOT: Path = settings.APP_ROOT + DATA_PATH: Path = APP_ROOT / "data" TMP_PATH: Path = DATA_PATH / "tmp" FILES_PATH: Path = settings.APP_ROOT / "files" GEOID_FIELD_NAME: str = "GEOID10" @@ -26,11 +36,51 @@ class ExtractTransformLoad: EXPECTED_MAX_CENSUS_BLOCK_GROUPS: int = 220405 EXPECTED_MAX_CENSUS_TRACTS: int = 73076 - def get_yaml_config(self) -> None: + def __init__(self, config_path: Path) -> None: + """Inits the class with instance specific variables""" + + # set by _get_yaml_config() + self.NAME: str = None + self.SOURCE_URL: str = None + self.GEOID_COL: str = None + self.GEO_LEVEL: str = None + self.SCORE_COLS: list = None + self.FIPS_CODES: pd.DataFrame = None + self.OUTPUT_PATH: Path = None + self.CENSUS_CSV: Path = None + + self._get_yaml_config(config_path) + + def _get_yaml_config(self, config_path: Path) -> None: """Reads the YAML configuration file for the dataset and stores the properies in the instance (upcoming feature)""" + # parse the yaml config file + try: + with open(config_path, "r", encoding="utf-8") as file: + config = yaml.safe_load(file) + except (FileNotFoundError, yaml.YAMLError) as err: + raise err - pass + # set dataset specific attributes + census_dir = self.DATA_PATH / "census" / "csv" + if config["is_census"]: + csv_dir = census_dir + else: + self.CENSUS_CSV = census_dir / "us.csv" + self.FIPS_CODES = self._get_census_fips_codes() + csv_dir = self.DATA_PATH / "dataset" + + # parse name and set output path + name = config.get("name") + snake_name = name.replace(" ", "_").lower() # converts to snake case + output_dir = snake_name + (config.get("year") or "") + self.OUTPUT_PATH = csv_dir / output_dir / "usa.csv" + self.OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) + + # set class attributes + attrs = ["NAME", "SOURCE_URL", "GEOID_COL", "GEO_LEVEL", "SCORE_COLS"] + for attr in attrs: + setattr(self, attr, config[attr.lower()]) def check_ttl(self) -> None: """Checks if the ETL process can be run based on a the TLL value on the @@ -44,9 +94,9 @@ class ExtractTransformLoad: extract_path: Path = None, verify: Optional[bool] = True, ) -> None: - """Extract the data from - a remote source. By default it provides code to get the file from a source url, - unzips it and stores it on an extract_path.""" + """Extract the data from a remote source. By default it provides code + to get the file from a source url, unzips it and stores it on an + extract_path.""" # this can be accessed via super().extract() if source_url and extract_path: @@ -70,3 +120,53 @@ class ExtractTransformLoad: """Clears out any files stored in the TMP folder""" remove_all_from_dir(self.TMP_PATH) + + # TODO: Add test for this + def _get_census_fips_codes(self) -> pd.DataFrame: + """Loads FIPS codes for each Census block group and tract""" + + # check that the census data exists + if not self.CENSUS_CSV.exists(): + logger.info("Census data not found, please run download_csv first") + # load the census data + df = pd.read_csv( + self.CENSUS_CSV, dtype={self.GEOID_FIELD_NAME: "string"} + ) + # extract Census tract FIPS code from Census block group + df[self.GEOID_TRACT_FIELD_NAME] = df[self.GEOID_FIELD_NAME].str[0:11] + return df[[self.GEOID_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME]] + + # TODO: Create tests + def validate_output(self) -> None: + """Checks that the output of the ETL process adheres to the contract + expected by the score module + + Contract conditions: + - Output is saved as usa.csv at the path specified by self.OUTPUT_PATH + - The output csv has a column named GEOID10 which stores each of the + Census block group FIPS codes in data/census/csv/usa.csv + - The output csv has a column named GEOID10_TRACT which stores each of + Census tract FIPS codes associated with each Census block group + - The output csv has each of the columns expected by the score and the + name and dtype of those columns match the format expected by score + """ + # read in output file + # and check that GEOID cols are present + assert self.OUTPUT_PATH.exists(), f"No file found at {self.OUTPUT_PATH}" + df_output = pd.read_csv( + self.OUTPUT_PATH, + dtype={ + self.GEOID_FIELD_NAME: "string", + self.GEOID_TRACT_FIELD_NAME: "string", + }, + ) + + # check that the GEOID cols in the output match census data + geoid_cols = [self.GEOID_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME] + for col in geoid_cols: + assert col in self.FIPS_CODES.columns + assert self.FIPS_CODES.equals(df_output[geoid_cols]) + + # check that the score columns are in the output + for col in self.SCORE_COLS: + assert col in df_output.columns, f"{col} is missing from output" diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index 5ed7ca81..f7eb8580 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -86,6 +86,7 @@ def score_generate() -> None: # Post Score Processing score_post() + def score_post() -> None: """Posts the score files to the local directory diff --git a/data/data-pipeline/data_pipeline/etl/score/constants.py b/data/data-pipeline/data_pipeline/etl/score/constants.py index f05513d0..1b52f618 100644 --- a/data/data-pipeline/data_pipeline/etl/score/constants.py +++ b/data/data-pipeline/data_pipeline/etl/score/constants.py @@ -46,8 +46,12 @@ timestamp_str = current_dt.strftime("%Y-%m-%d-%H%M") SCORE_DOWNLOADABLE_DIR = DATA_SCORE_DIR / "downloadable" SCORE_DOWNLOADABLE_PDF_FILE_NAME = "Draft_Communities_List.pdf" SCORE_DOWNLOADABLE_PDF_FILE_PATH = FILES_PATH / SCORE_DOWNLOADABLE_PDF_FILE_NAME -SCORE_DOWNLOADABLE_CSV_FILE_PATH = SCORE_DOWNLOADABLE_DIR / f"communities-{timestamp_str}.csv" -SCORE_DOWNLOADABLE_EXCEL_FILE_PATH = SCORE_DOWNLOADABLE_DIR / f"communities-{timestamp_str}.xlsx" +SCORE_DOWNLOADABLE_CSV_FILE_PATH = ( + SCORE_DOWNLOADABLE_DIR / f"communities-{timestamp_str}.csv" +) +SCORE_DOWNLOADABLE_EXCEL_FILE_PATH = ( + SCORE_DOWNLOADABLE_DIR / f"communities-{timestamp_str}.xlsx" +) SCORE_DOWNLOADABLE_ZIP_FILE_PATH = ( SCORE_DOWNLOADABLE_DIR / "Screening_Tool_Data.zip" ) diff --git a/data/data-pipeline/data_pipeline/tests/base/config.yaml b/data/data-pipeline/data_pipeline/tests/base/config.yaml new file mode 100644 index 00000000..690e8beb --- /dev/null +++ b/data/data-pipeline/data_pipeline/tests/base/config.yaml @@ -0,0 +1,10 @@ +name: Template +year: null +is_census: false +source_url: https://github.com/usds/justice40-tool/ +geo_level: Census Block Group +geoid_col: GEO COL +score_cols: + - COL 1 + - COL 2 + - COL 3 diff --git a/data/data-pipeline/data_pipeline/tests/base/data/census.csv b/data/data-pipeline/data_pipeline/tests/base/data/census.csv new file mode 100644 index 00000000..e0ed6226 --- /dev/null +++ b/data/data-pipeline/data_pipeline/tests/base/data/census.csv @@ -0,0 +1,11 @@ +GEOID10,POPULATION +050070403001,1000 +050070403002,1500 +050010201001,1000 +050010201002,1500 +150070405001,2000 +150070405002,2250 +150010210101,2000 +150010210102,1500 +150010211011,1750 +150010211012,1500 diff --git a/data/data-pipeline/data_pipeline/tests/base/data/output.csv b/data/data-pipeline/data_pipeline/tests/base/data/output.csv new file mode 100644 index 00000000..801a3022 --- /dev/null +++ b/data/data-pipeline/data_pipeline/tests/base/data/output.csv @@ -0,0 +1,11 @@ +GEOID10,GEOID10_TRACT,COL 1,COL 2,COL 3 +050070403001,05007040300,10,10,10 +050070403002,05007040300,20,20,20 +050010201001,05001020100,30,30,30 +050010201002,05001020100,40,40,40 +150070405001,15007040500,50,50,50 +150070405002,15007040500,60,60,60 +150010210101,15001021010,70,70,70 +150010210102,15001021010,80,80,80 +150010211011,15001021101,90,90,90 +150010211012,15001021101,100,100,100 diff --git a/data/data-pipeline/data_pipeline/tests/base/invalid_config.yaml b/data/data-pipeline/data_pipeline/tests/base/invalid_config.yaml new file mode 100644 index 00000000..104e87e6 --- /dev/null +++ b/data/data-pipeline/data_pipeline/tests/base/invalid_config.yaml @@ -0,0 +1,10 @@ +name = Template # uses equal sign instead of colon +year: null +is_dataset: true +source_url: https://github.com/usds/justice40-tool/ +geo_level: Census Block Group +geoid_col: GEO COL +score_cols: + - COL 1 + - COL 2 + - COL 3 diff --git a/data/data-pipeline/data_pipeline/tests/base/test_base.py b/data/data-pipeline/data_pipeline/tests/base/test_base.py new file mode 100644 index 00000000..a75b82ea --- /dev/null +++ b/data/data-pipeline/data_pipeline/tests/base/test_base.py @@ -0,0 +1,161 @@ +import shutil +from pathlib import Path + +import yaml +import pytest +import pandas as pd + +from data_pipeline.config import settings +from data_pipeline.etl.base import ExtractTransformLoad + +TEST_DIR = settings.APP_ROOT / "tests" / "base" +DATA_DIR = TEST_DIR / "data" +CONFIG_PATH = TEST_DIR / "config.yaml" +OUTPUT_SRC = DATA_DIR / "output.csv" + + +def remove_output(etl): + """Clears output.csv if it is exists""" + etl = TemplateETL(CONFIG_PATH) + if etl.OUTPUT_PATH.exists(): + etl.OUTPUT_PATH.unlink() + assert etl.OUTPUT_PATH.exists() is False + + +def load_output_source(etl): + """Loads output csv so that it can be modified""" + df = pd.read_csv( + OUTPUT_SRC, + dtype={ + etl.GEOID_FIELD_NAME: "string", + etl.GEOID_TRACT_FIELD_NAME: "string", + }, + ) + return df + + +class TemplateETL(ExtractTransformLoad): + """Mock ETL class that inherits from the base ETL""" + + def __init__(self, config_path: Path) -> None: + super().__init__(config_path) + self.EXTRACTED_CSV: Path = DATA_DIR / "output.csv" + self.df: pd.DataFrame = None + + +class TestInit: + """Tests the super.init() method in a class that inherits from + ExtractTransformLoad""" + + def test_init(self, mock_paths, mock_etl): + """Tests that the init method executes successfully + + Validates the following conditions: + - The class was instantiated with no errors + - All of the class attributes were set correctly by _get_yaml_config() + """ + # setup + data_path, tmp_path = mock_paths + etl = TemplateETL(CONFIG_PATH) + # validation + assert etl.NAME == "Template" + assert etl.SOURCE_URL == "https://github.com/usds/justice40-tool/" + assert etl.GEOID_COL == "GEO COL" + assert etl.GEO_LEVEL == "Census Block Group" + assert etl.SCORE_COLS == ["COL 1", "COL 2", "COL 3"] + assert etl.OUTPUT_PATH == data_path / "dataset" / "template" / "usa.csv" + assert etl.CENSUS_CSV.exists() + + def test_init_missing_config(self, mock_etl): + """Tests that FileNotFoundError is raised when the class is instantiated + with a path to a config.yaml file that doesn't exist + """ + # setup + config_path = settings.APP_ROOT / "fake_path" + assert config_path.exists() is False + # execute + with pytest.raises(FileNotFoundError): + TemplateETL(config_path) + + def test_init_bad_config(self, mock_etl): + """Tests that YAMLError is raised when the class is instantiated with + a yaml file that has errors in it + """ + # setup + config_path = TEST_DIR / "invalid_config.yaml" + assert config_path.exists() + # execute + with pytest.raises(yaml.YAMLError): + TemplateETL(config_path) + + +class TestValidateOutput: + """Tests the ExtractTransformLoad.validate_output() method""" + + def test_validate_output_success(self, mock_etl): + """Tests that validate_output() runs successfully with valid output""" + # setup - instantiate etl class + etl = TemplateETL(CONFIG_PATH) + # setup - load output file + shutil.copyfile(OUTPUT_SRC, etl.OUTPUT_PATH) + # validation + etl.validate_output() + + def test_validate_output_missing_output(self, mock_etl): + """Tests that validate_output() fails if the output isn't written to + the location at self.OUTPUT_PATH + """ + # setup - remove output file + etl = TemplateETL(CONFIG_PATH) + remove_output(etl) + # validation + with pytest.raises(AssertionError): + etl.validate_output() + + def test_validate_missing_geoid_col(self, mock_etl): + """Tests that validate_output() fails if the output is missing one of + census fips codes columns + """ + # setup - remove output file + etl = TemplateETL(CONFIG_PATH) + remove_output(etl) + # setup - delete GEOID10 col from output + df = load_output_source(etl) + df.drop(etl.GEOID_FIELD_NAME, axis=1, inplace=True) + assert etl.GEOID_FIELD_NAME not in df.columns + df.to_csv(etl.OUTPUT_PATH) + # validation + with pytest.raises(KeyError): + etl.validate_output() + + def test_validate_missing_census_block_group(self, mock_etl): + """Tests that validate_output() fails if the output is missing one of + census block group rows + """ + # setup - remove output file + etl = TemplateETL(CONFIG_PATH) + remove_output(etl) + # setup - remove the first Census Block Group + df = load_output_source(etl) + df.drop(index=df.index[0], axis=0, inplace=True) # delete row 1 + assert len(df) == 9 + df.to_csv(etl.OUTPUT_PATH) + # validation + with pytest.raises(AssertionError): + etl.validate_output() + + def test_validate_missing_score_col(self, mock_etl): + """Tests that validate_output() fails if the output is missing one of + the columns used in the score + """ + # setup - remove output file + etl = TemplateETL(CONFIG_PATH) + remove_output(etl) + # setup - delete one of the score columns + df = load_output_source(etl) + df.drop("COL 1", axis=1, inplace=True) + assert "COL 1" not in df.columns + df.to_csv(etl.OUTPUT_PATH) + # validation + with pytest.raises(AssertionError): + etl.validate_output() diff --git a/data/data-pipeline/data_pipeline/tests/conftest.py b/data/data-pipeline/data_pipeline/tests/conftest.py index 6b5df627..1755852b 100644 --- a/data/data-pipeline/data_pipeline/tests/conftest.py +++ b/data/data-pipeline/data_pipeline/tests/conftest.py @@ -1,4 +1,6 @@ import os +from pathlib import Path +from shutil import copyfile import pytest @@ -8,6 +10,22 @@ from data_pipeline.etl.base import ExtractTransformLoad TMP_DIR = settings.APP_ROOT / "data" / "tmp" / "tests" +def copy_data_files(src: Path, dst: Path) -> None: + """Copies test data from src Path to dst Path for use in testing + + Args + src: pathlib.Path instance. The location of the source data file. + dst: pathlib.Path instance. Where to copy the source data file to. + + Returns + None. This is a void function + """ + if not dst.exists(): + dst.parent.mkdir(parents=True, exist_ok=True) + copyfile(src, dst) + assert dst.exists() + + @pytest.fixture(scope="session") def mock_paths(tmp_path_factory) -> tuple: """Creates new DATA_PATH and TMP_PATH that point to a temporary local @@ -23,8 +41,17 @@ def mock_paths(tmp_path_factory) -> tuple: return data_path, tmp_path +@pytest.fixture(scope="session") +def mock_census(mock_paths) -> Path: + data_path, tmp_path = mock_paths + census_src = settings.APP_ROOT / "tests" / "base" / "data" / "census.csv" + census_dst = data_path / "census" / "csv" / "us.csv" + copy_data_files(census_src, census_dst) + return census_dst + + @pytest.fixture -def mock_etl(monkeypatch, mock_paths) -> None: +def mock_etl(monkeypatch, mock_paths, mock_census) -> None: """Creates a mock version of the base ExtractTransformLoad class and resets global the variables for DATA_PATH and TMP_PATH to the local mock_paths """ diff --git a/data/data-pipeline/data_pipeline/tests/sources/national_risk_index/test_etl.py b/data/data-pipeline/data_pipeline/tests/sources/national_risk_index/test_etl.py index 85110591..6c4b40d7 100644 --- a/data/data-pipeline/data_pipeline/tests/sources/national_risk_index/test_etl.py +++ b/data/data-pipeline/data_pipeline/tests/sources/national_risk_index/test_etl.py @@ -1,9 +1,7 @@ -from pathlib import Path -from shutil import copyfile - import pandas as pd from data_pipeline.config import settings +from data_pipeline.tests.conftest import copy_data_files from data_pipeline.etl.sources.national_risk_index.etl import ( NationalRiskIndexETL, ) @@ -13,22 +11,6 @@ DATA_DIR = ( ) -def copy_data_files(src: Path, dst: Path) -> None: - """Copies test data from src Path to dst Path for use in testing - - Args - src: pathlib.Path instance. The location of the source data file. - dst: pathlib.Path instance. Where to copy the source data file to. - - Returns - None. This is a void function - """ - if not dst.exists(): - dst.parent.mkdir(parents=True, exist_ok=True) - copyfile(src, dst) - assert dst.exists() - - class TestNationalRiskIndexETL: def test_init(self, mock_etl, mock_paths): """Tests that the mock NationalRiskIndexETL class instance was @@ -45,6 +27,7 @@ class TestNationalRiskIndexETL: data_path, tmp_path = mock_paths input_csv = tmp_path / "NRI_Table_CensusTracts.csv" output_dir = data_path / "dataset" / "national_risk_index_2020" + print(input_csv) # validation assert etl.DATA_PATH == data_path assert etl.TMP_PATH == tmp_path @@ -66,7 +49,7 @@ class TestNationalRiskIndexETL: input_src = DATA_DIR / "input.csv" input_dst = etl.INPUT_CSV acs_src = DATA_DIR / "acs.csv" - acs_dst = DATA_DIR / etl.BLOCK_GROUP_CSV + acs_dst = etl.BLOCK_GROUP_CSV for src, dst in [(input_src, input_dst), (acs_src, acs_dst)]: copy_data_files(src, dst) # setup - read in sample output as dataframe diff --git a/data/data-pipeline/data_pipeline/utils.py b/data/data-pipeline/data_pipeline/utils.py index 3d1174d8..c1bce9e7 100644 --- a/data/data-pipeline/data_pipeline/utils.py +++ b/data/data-pipeline/data_pipeline/utils.py @@ -189,12 +189,14 @@ def score_folder_cleanup() -> None: remove_all_from_dir(data_path / "score" / "tiles") downloadable_cleanup() + def downloadable_cleanup() -> None: """Remove all files from downloadable directory in the local data/score path""" data_path = settings.APP_ROOT / "data" remove_all_from_dir(data_path / "score" / "downloadable") + def temp_folder_cleanup() -> None: """Remove all files and directories from the local data/tmp temporary path""" diff --git a/data/data-pipeline/pyproject.toml b/data/data-pipeline/pyproject.toml index 4af36fa1..ba268a85 100644 --- a/data/data-pipeline/pyproject.toml +++ b/data/data-pipeline/pyproject.toml @@ -64,13 +64,14 @@ disable = [ "C0116", # Disables missing function or method docstring "C0115", # Disables missing class docstring "R0915", # Disables too many statements (score generation transform) + "W0231", # Disables super init not called ] [tool.pylint.FORMAT] max-line-length = 150 [tool.pylint.typecheck] -generated-members = "pandas.*" # fixes E1101 for ETL.df +generated-members = "pandas.*" # fixes E1101 for ETL.df [tool.pylint.SIMILARITIES] # Configures how pylint detects repetitive code