Add ETL Contract Checks (#619)

* Adds dev dependencies to requirements.txt and re-runs black on codebase

* Adds test and code for national risk index etl, still in progress

* Removes test_data from .gitignore

* Adds test data to nation_risk_index tests

* Creates tests and ETL class for NRI data

* Adds tests for load() and transform() methods of NationalRiskIndexETL

* Updates README.md with info about the NRI dataset

* Adds to dos

* Moves tests and test data into a tests/ dir in national_risk_index

* Moves tmp_dir for tests into data/tmp/tests/

* Promotes fixtures to conftest and relocates national_risk_index tests:
The relocation of national_risk_index tests is necessary because tests 
can only use fixtures specified in conftests within the same package

* Fixes issue with df.equals() in test_transform()

* Files reformatted by black

* Commit changes to other files after re-running black

* Fixes unused import that caused lint checks to fail

* Moves tests/ directory to app root for data_pipeline

* Adds new methods to ExtractTransformLoad base class:
- __init__() Initializes class attributes
- _get_census_fips_codes() Loads a dataframe with the fips codes for 
census block group and tract
- validate_init() Checks that the class was initialized correctly
- validate_output() Checks that the output was loaded correctly

* Adds test for ExtractTransformLoad.__init__() and base.py

* Fixes failing flake8 test

* Changes geo_col to geoid_col and changes is_dataset to is_census in yaml

* Adds test for validate_output()

* Adds remaining tests

* Removes is_dataset from init method

* Makes CENSUS_CSV a class attribute instead of a class global:
This ensures that CENSUS_CSV is only set when the ETL class is for a 
non-census dataset and removes the need to overwrite the value in 
mock_etl fixture

* Re-formats files with black and fixes broken tox tests
This commit is contained in:
Billy Daly 2021-10-13 15:54:15 -04:00 committed by GitHub
parent 1f78920f63
commit d1273b63c5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 358 additions and 32 deletions

View file

@ -3,7 +3,12 @@ import sys
import click import click
from data_pipeline.config import settings from data_pipeline.config import settings
from data_pipeline.etl.runner import etl_runner, score_generate, score_geo, score_post from data_pipeline.etl.runner import (
etl_runner,
score_generate,
score_geo,
score_post,
)
from data_pipeline.etl.sources.census.etl_utils import ( from data_pipeline.etl.sources.census.etl_utils import (
reset_data_directories as census_reset, reset_data_directories as census_reset,
) )

View file

@ -1,8 +1,17 @@
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
import pandas as pd
import yaml
from data_pipeline.config import settings from data_pipeline.config import settings
from data_pipeline.utils import unzip_file_from_url, remove_all_from_dir from data_pipeline.utils import (
unzip_file_from_url,
remove_all_from_dir,
get_module_logger,
)
logger = get_module_logger(__name__)
class ExtractTransformLoad: class ExtractTransformLoad:
@ -17,7 +26,8 @@ class ExtractTransformLoad:
GEOID_TRACT_FIELD_NAME (str): The common column name for a Census Tract identifier GEOID_TRACT_FIELD_NAME (str): The common column name for a Census Tract identifier
""" """
DATA_PATH: Path = settings.APP_ROOT / "data" APP_ROOT: Path = settings.APP_ROOT
DATA_PATH: Path = APP_ROOT / "data"
TMP_PATH: Path = DATA_PATH / "tmp" TMP_PATH: Path = DATA_PATH / "tmp"
FILES_PATH: Path = settings.APP_ROOT / "files" FILES_PATH: Path = settings.APP_ROOT / "files"
GEOID_FIELD_NAME: str = "GEOID10" GEOID_FIELD_NAME: str = "GEOID10"
@ -26,11 +36,51 @@ class ExtractTransformLoad:
EXPECTED_MAX_CENSUS_BLOCK_GROUPS: int = 220405 EXPECTED_MAX_CENSUS_BLOCK_GROUPS: int = 220405
EXPECTED_MAX_CENSUS_TRACTS: int = 73076 EXPECTED_MAX_CENSUS_TRACTS: int = 73076
def get_yaml_config(self) -> None: def __init__(self, config_path: Path) -> None:
"""Inits the class with instance specific variables"""
# set by _get_yaml_config()
self.NAME: str = None
self.SOURCE_URL: str = None
self.GEOID_COL: str = None
self.GEO_LEVEL: str = None
self.SCORE_COLS: list = None
self.FIPS_CODES: pd.DataFrame = None
self.OUTPUT_PATH: Path = None
self.CENSUS_CSV: Path = None
self._get_yaml_config(config_path)
def _get_yaml_config(self, config_path: Path) -> None:
"""Reads the YAML configuration file for the dataset and stores """Reads the YAML configuration file for the dataset and stores
the properies in the instance (upcoming feature)""" the properies in the instance (upcoming feature)"""
# parse the yaml config file
try:
with open(config_path, "r", encoding="utf-8") as file:
config = yaml.safe_load(file)
except (FileNotFoundError, yaml.YAMLError) as err:
raise err
pass # set dataset specific attributes
census_dir = self.DATA_PATH / "census" / "csv"
if config["is_census"]:
csv_dir = census_dir
else:
self.CENSUS_CSV = census_dir / "us.csv"
self.FIPS_CODES = self._get_census_fips_codes()
csv_dir = self.DATA_PATH / "dataset"
# parse name and set output path
name = config.get("name")
snake_name = name.replace(" ", "_").lower() # converts to snake case
output_dir = snake_name + (config.get("year") or "")
self.OUTPUT_PATH = csv_dir / output_dir / "usa.csv"
self.OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
# set class attributes
attrs = ["NAME", "SOURCE_URL", "GEOID_COL", "GEO_LEVEL", "SCORE_COLS"]
for attr in attrs:
setattr(self, attr, config[attr.lower()])
def check_ttl(self) -> None: def check_ttl(self) -> None:
"""Checks if the ETL process can be run based on a the TLL value on the """Checks if the ETL process can be run based on a the TLL value on the
@ -44,9 +94,9 @@ class ExtractTransformLoad:
extract_path: Path = None, extract_path: Path = None,
verify: Optional[bool] = True, verify: Optional[bool] = True,
) -> None: ) -> None:
"""Extract the data from """Extract the data from a remote source. By default it provides code
a remote source. By default it provides code to get the file from a source url, to get the file from a source url, unzips it and stores it on an
unzips it and stores it on an extract_path.""" extract_path."""
# this can be accessed via super().extract() # this can be accessed via super().extract()
if source_url and extract_path: if source_url and extract_path:
@ -70,3 +120,53 @@ class ExtractTransformLoad:
"""Clears out any files stored in the TMP folder""" """Clears out any files stored in the TMP folder"""
remove_all_from_dir(self.TMP_PATH) remove_all_from_dir(self.TMP_PATH)
# TODO: Add test for this
def _get_census_fips_codes(self) -> pd.DataFrame:
"""Loads FIPS codes for each Census block group and tract"""
# check that the census data exists
if not self.CENSUS_CSV.exists():
logger.info("Census data not found, please run download_csv first")
# load the census data
df = pd.read_csv(
self.CENSUS_CSV, dtype={self.GEOID_FIELD_NAME: "string"}
)
# extract Census tract FIPS code from Census block group
df[self.GEOID_TRACT_FIELD_NAME] = df[self.GEOID_FIELD_NAME].str[0:11]
return df[[self.GEOID_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME]]
# TODO: Create tests
def validate_output(self) -> None:
"""Checks that the output of the ETL process adheres to the contract
expected by the score module
Contract conditions:
- Output is saved as usa.csv at the path specified by self.OUTPUT_PATH
- The output csv has a column named GEOID10 which stores each of the
Census block group FIPS codes in data/census/csv/usa.csv
- The output csv has a column named GEOID10_TRACT which stores each of
Census tract FIPS codes associated with each Census block group
- The output csv has each of the columns expected by the score and the
name and dtype of those columns match the format expected by score
"""
# read in output file
# and check that GEOID cols are present
assert self.OUTPUT_PATH.exists(), f"No file found at {self.OUTPUT_PATH}"
df_output = pd.read_csv(
self.OUTPUT_PATH,
dtype={
self.GEOID_FIELD_NAME: "string",
self.GEOID_TRACT_FIELD_NAME: "string",
},
)
# check that the GEOID cols in the output match census data
geoid_cols = [self.GEOID_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME]
for col in geoid_cols:
assert col in self.FIPS_CODES.columns
assert self.FIPS_CODES.equals(df_output[geoid_cols])
# check that the score columns are in the output
for col in self.SCORE_COLS:
assert col in df_output.columns, f"{col} is missing from output"

View file

@ -86,6 +86,7 @@ def score_generate() -> None:
# Post Score Processing # Post Score Processing
score_post() score_post()
def score_post() -> None: def score_post() -> None:
"""Posts the score files to the local directory """Posts the score files to the local directory

View file

@ -46,8 +46,12 @@ timestamp_str = current_dt.strftime("%Y-%m-%d-%H%M")
SCORE_DOWNLOADABLE_DIR = DATA_SCORE_DIR / "downloadable" SCORE_DOWNLOADABLE_DIR = DATA_SCORE_DIR / "downloadable"
SCORE_DOWNLOADABLE_PDF_FILE_NAME = "Draft_Communities_List.pdf" SCORE_DOWNLOADABLE_PDF_FILE_NAME = "Draft_Communities_List.pdf"
SCORE_DOWNLOADABLE_PDF_FILE_PATH = FILES_PATH / SCORE_DOWNLOADABLE_PDF_FILE_NAME SCORE_DOWNLOADABLE_PDF_FILE_PATH = FILES_PATH / SCORE_DOWNLOADABLE_PDF_FILE_NAME
SCORE_DOWNLOADABLE_CSV_FILE_PATH = SCORE_DOWNLOADABLE_DIR / f"communities-{timestamp_str}.csv" SCORE_DOWNLOADABLE_CSV_FILE_PATH = (
SCORE_DOWNLOADABLE_EXCEL_FILE_PATH = SCORE_DOWNLOADABLE_DIR / f"communities-{timestamp_str}.xlsx" SCORE_DOWNLOADABLE_DIR / f"communities-{timestamp_str}.csv"
)
SCORE_DOWNLOADABLE_EXCEL_FILE_PATH = (
SCORE_DOWNLOADABLE_DIR / f"communities-{timestamp_str}.xlsx"
)
SCORE_DOWNLOADABLE_ZIP_FILE_PATH = ( SCORE_DOWNLOADABLE_ZIP_FILE_PATH = (
SCORE_DOWNLOADABLE_DIR / "Screening_Tool_Data.zip" SCORE_DOWNLOADABLE_DIR / "Screening_Tool_Data.zip"
) )

View file

@ -0,0 +1,10 @@
name: Template
year: null
is_census: false
source_url: https://github.com/usds/justice40-tool/
geo_level: Census Block Group
geoid_col: GEO COL
score_cols:
- COL 1
- COL 2
- COL 3

View file

@ -0,0 +1,11 @@
GEOID10,POPULATION
050070403001,1000
050070403002,1500
050010201001,1000
050010201002,1500
150070405001,2000
150070405002,2250
150010210101,2000
150010210102,1500
150010211011,1750
150010211012,1500
1 GEOID10 POPULATION
2 050070403001 1000
3 050070403002 1500
4 050010201001 1000
5 050010201002 1500
6 150070405001 2000
7 150070405002 2250
8 150010210101 2000
9 150010210102 1500
10 150010211011 1750
11 150010211012 1500

View file

@ -0,0 +1,11 @@
GEOID10,GEOID10_TRACT,COL 1,COL 2,COL 3
050070403001,05007040300,10,10,10
050070403002,05007040300,20,20,20
050010201001,05001020100,30,30,30
050010201002,05001020100,40,40,40
150070405001,15007040500,50,50,50
150070405002,15007040500,60,60,60
150010210101,15001021010,70,70,70
150010210102,15001021010,80,80,80
150010211011,15001021101,90,90,90
150010211012,15001021101,100,100,100
1 GEOID10 GEOID10_TRACT COL 1 COL 2 COL 3
2 050070403001 05007040300 10 10 10
3 050070403002 05007040300 20 20 20
4 050010201001 05001020100 30 30 30
5 050010201002 05001020100 40 40 40
6 150070405001 15007040500 50 50 50
7 150070405002 15007040500 60 60 60
8 150010210101 15001021010 70 70 70
9 150010210102 15001021010 80 80 80
10 150010211011 15001021101 90 90 90
11 150010211012 15001021101 100 100 100

View file

@ -0,0 +1,10 @@
name = Template # uses equal sign instead of colon
year: null
is_dataset: true
source_url: https://github.com/usds/justice40-tool/
geo_level: Census Block Group
geoid_col: GEO COL
score_cols:
- COL 1
- COL 2
- COL 3

View file

@ -0,0 +1,161 @@
import shutil
from pathlib import Path
import yaml
import pytest
import pandas as pd
from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad
TEST_DIR = settings.APP_ROOT / "tests" / "base"
DATA_DIR = TEST_DIR / "data"
CONFIG_PATH = TEST_DIR / "config.yaml"
OUTPUT_SRC = DATA_DIR / "output.csv"
def remove_output(etl):
"""Clears output.csv if it is exists"""
etl = TemplateETL(CONFIG_PATH)
if etl.OUTPUT_PATH.exists():
etl.OUTPUT_PATH.unlink()
assert etl.OUTPUT_PATH.exists() is False
def load_output_source(etl):
"""Loads output csv so that it can be modified"""
df = pd.read_csv(
OUTPUT_SRC,
dtype={
etl.GEOID_FIELD_NAME: "string",
etl.GEOID_TRACT_FIELD_NAME: "string",
},
)
return df
class TemplateETL(ExtractTransformLoad):
"""Mock ETL class that inherits from the base ETL"""
def __init__(self, config_path: Path) -> None:
super().__init__(config_path)
self.EXTRACTED_CSV: Path = DATA_DIR / "output.csv"
self.df: pd.DataFrame = None
class TestInit:
"""Tests the super.init() method in a class that inherits from
ExtractTransformLoad"""
def test_init(self, mock_paths, mock_etl):
"""Tests that the init method executes successfully
Validates the following conditions:
- The class was instantiated with no errors
- All of the class attributes were set correctly by _get_yaml_config()
"""
# setup
data_path, tmp_path = mock_paths
etl = TemplateETL(CONFIG_PATH)
# validation
assert etl.NAME == "Template"
assert etl.SOURCE_URL == "https://github.com/usds/justice40-tool/"
assert etl.GEOID_COL == "GEO COL"
assert etl.GEO_LEVEL == "Census Block Group"
assert etl.SCORE_COLS == ["COL 1", "COL 2", "COL 3"]
assert etl.OUTPUT_PATH == data_path / "dataset" / "template" / "usa.csv"
assert etl.CENSUS_CSV.exists()
def test_init_missing_config(self, mock_etl):
"""Tests that FileNotFoundError is raised when the class is instantiated
with a path to a config.yaml file that doesn't exist
"""
# setup
config_path = settings.APP_ROOT / "fake_path"
assert config_path.exists() is False
# execute
with pytest.raises(FileNotFoundError):
TemplateETL(config_path)
def test_init_bad_config(self, mock_etl):
"""Tests that YAMLError is raised when the class is instantiated with
a yaml file that has errors in it
"""
# setup
config_path = TEST_DIR / "invalid_config.yaml"
assert config_path.exists()
# execute
with pytest.raises(yaml.YAMLError):
TemplateETL(config_path)
class TestValidateOutput:
"""Tests the ExtractTransformLoad.validate_output() method"""
def test_validate_output_success(self, mock_etl):
"""Tests that validate_output() runs successfully with valid output"""
# setup - instantiate etl class
etl = TemplateETL(CONFIG_PATH)
# setup - load output file
shutil.copyfile(OUTPUT_SRC, etl.OUTPUT_PATH)
# validation
etl.validate_output()
def test_validate_output_missing_output(self, mock_etl):
"""Tests that validate_output() fails if the output isn't written to
the location at self.OUTPUT_PATH
"""
# setup - remove output file
etl = TemplateETL(CONFIG_PATH)
remove_output(etl)
# validation
with pytest.raises(AssertionError):
etl.validate_output()
def test_validate_missing_geoid_col(self, mock_etl):
"""Tests that validate_output() fails if the output is missing one of
census fips codes columns
"""
# setup - remove output file
etl = TemplateETL(CONFIG_PATH)
remove_output(etl)
# setup - delete GEOID10 col from output
df = load_output_source(etl)
df.drop(etl.GEOID_FIELD_NAME, axis=1, inplace=True)
assert etl.GEOID_FIELD_NAME not in df.columns
df.to_csv(etl.OUTPUT_PATH)
# validation
with pytest.raises(KeyError):
etl.validate_output()
def test_validate_missing_census_block_group(self, mock_etl):
"""Tests that validate_output() fails if the output is missing one of
census block group rows
"""
# setup - remove output file
etl = TemplateETL(CONFIG_PATH)
remove_output(etl)
# setup - remove the first Census Block Group
df = load_output_source(etl)
df.drop(index=df.index[0], axis=0, inplace=True) # delete row 1
assert len(df) == 9
df.to_csv(etl.OUTPUT_PATH)
# validation
with pytest.raises(AssertionError):
etl.validate_output()
def test_validate_missing_score_col(self, mock_etl):
"""Tests that validate_output() fails if the output is missing one of
the columns used in the score
"""
# setup - remove output file
etl = TemplateETL(CONFIG_PATH)
remove_output(etl)
# setup - delete one of the score columns
df = load_output_source(etl)
df.drop("COL 1", axis=1, inplace=True)
assert "COL 1" not in df.columns
df.to_csv(etl.OUTPUT_PATH)
# validation
with pytest.raises(AssertionError):
etl.validate_output()

View file

@ -1,4 +1,6 @@
import os import os
from pathlib import Path
from shutil import copyfile
import pytest import pytest
@ -8,6 +10,22 @@ from data_pipeline.etl.base import ExtractTransformLoad
TMP_DIR = settings.APP_ROOT / "data" / "tmp" / "tests" TMP_DIR = settings.APP_ROOT / "data" / "tmp" / "tests"
def copy_data_files(src: Path, dst: Path) -> None:
"""Copies test data from src Path to dst Path for use in testing
Args
src: pathlib.Path instance. The location of the source data file.
dst: pathlib.Path instance. Where to copy the source data file to.
Returns
None. This is a void function
"""
if not dst.exists():
dst.parent.mkdir(parents=True, exist_ok=True)
copyfile(src, dst)
assert dst.exists()
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def mock_paths(tmp_path_factory) -> tuple: def mock_paths(tmp_path_factory) -> tuple:
"""Creates new DATA_PATH and TMP_PATH that point to a temporary local """Creates new DATA_PATH and TMP_PATH that point to a temporary local
@ -23,8 +41,17 @@ def mock_paths(tmp_path_factory) -> tuple:
return data_path, tmp_path return data_path, tmp_path
@pytest.fixture(scope="session")
def mock_census(mock_paths) -> Path:
data_path, tmp_path = mock_paths
census_src = settings.APP_ROOT / "tests" / "base" / "data" / "census.csv"
census_dst = data_path / "census" / "csv" / "us.csv"
copy_data_files(census_src, census_dst)
return census_dst
@pytest.fixture @pytest.fixture
def mock_etl(monkeypatch, mock_paths) -> None: def mock_etl(monkeypatch, mock_paths, mock_census) -> None:
"""Creates a mock version of the base ExtractTransformLoad class and resets """Creates a mock version of the base ExtractTransformLoad class and resets
global the variables for DATA_PATH and TMP_PATH to the local mock_paths global the variables for DATA_PATH and TMP_PATH to the local mock_paths
""" """

View file

@ -1,9 +1,7 @@
from pathlib import Path
from shutil import copyfile
import pandas as pd import pandas as pd
from data_pipeline.config import settings from data_pipeline.config import settings
from data_pipeline.tests.conftest import copy_data_files
from data_pipeline.etl.sources.national_risk_index.etl import ( from data_pipeline.etl.sources.national_risk_index.etl import (
NationalRiskIndexETL, NationalRiskIndexETL,
) )
@ -13,22 +11,6 @@ DATA_DIR = (
) )
def copy_data_files(src: Path, dst: Path) -> None:
"""Copies test data from src Path to dst Path for use in testing
Args
src: pathlib.Path instance. The location of the source data file.
dst: pathlib.Path instance. Where to copy the source data file to.
Returns
None. This is a void function
"""
if not dst.exists():
dst.parent.mkdir(parents=True, exist_ok=True)
copyfile(src, dst)
assert dst.exists()
class TestNationalRiskIndexETL: class TestNationalRiskIndexETL:
def test_init(self, mock_etl, mock_paths): def test_init(self, mock_etl, mock_paths):
"""Tests that the mock NationalRiskIndexETL class instance was """Tests that the mock NationalRiskIndexETL class instance was
@ -45,6 +27,7 @@ class TestNationalRiskIndexETL:
data_path, tmp_path = mock_paths data_path, tmp_path = mock_paths
input_csv = tmp_path / "NRI_Table_CensusTracts.csv" input_csv = tmp_path / "NRI_Table_CensusTracts.csv"
output_dir = data_path / "dataset" / "national_risk_index_2020" output_dir = data_path / "dataset" / "national_risk_index_2020"
print(input_csv)
# validation # validation
assert etl.DATA_PATH == data_path assert etl.DATA_PATH == data_path
assert etl.TMP_PATH == tmp_path assert etl.TMP_PATH == tmp_path
@ -66,7 +49,7 @@ class TestNationalRiskIndexETL:
input_src = DATA_DIR / "input.csv" input_src = DATA_DIR / "input.csv"
input_dst = etl.INPUT_CSV input_dst = etl.INPUT_CSV
acs_src = DATA_DIR / "acs.csv" acs_src = DATA_DIR / "acs.csv"
acs_dst = DATA_DIR / etl.BLOCK_GROUP_CSV acs_dst = etl.BLOCK_GROUP_CSV
for src, dst in [(input_src, input_dst), (acs_src, acs_dst)]: for src, dst in [(input_src, input_dst), (acs_src, acs_dst)]:
copy_data_files(src, dst) copy_data_files(src, dst)
# setup - read in sample output as dataframe # setup - read in sample output as dataframe

View file

@ -189,12 +189,14 @@ def score_folder_cleanup() -> None:
remove_all_from_dir(data_path / "score" / "tiles") remove_all_from_dir(data_path / "score" / "tiles")
downloadable_cleanup() downloadable_cleanup()
def downloadable_cleanup() -> None: def downloadable_cleanup() -> None:
"""Remove all files from downloadable directory in the local data/score path""" """Remove all files from downloadable directory in the local data/score path"""
data_path = settings.APP_ROOT / "data" data_path = settings.APP_ROOT / "data"
remove_all_from_dir(data_path / "score" / "downloadable") remove_all_from_dir(data_path / "score" / "downloadable")
def temp_folder_cleanup() -> None: def temp_folder_cleanup() -> None:
"""Remove all files and directories from the local data/tmp temporary path""" """Remove all files and directories from the local data/tmp temporary path"""

View file

@ -64,13 +64,14 @@ disable = [
"C0116", # Disables missing function or method docstring "C0116", # Disables missing function or method docstring
"C0115", # Disables missing class docstring "C0115", # Disables missing class docstring
"R0915", # Disables too many statements (score generation transform) "R0915", # Disables too many statements (score generation transform)
"W0231", # Disables super init not called
] ]
[tool.pylint.FORMAT] [tool.pylint.FORMAT]
max-line-length = 150 max-line-length = 150
[tool.pylint.typecheck] [tool.pylint.typecheck]
generated-members = "pandas.*" # fixes E1101 for ETL.df generated-members = "pandas.*" # fixes E1101 for ETL.df
[tool.pylint.SIMILARITIES] [tool.pylint.SIMILARITIES]
# Configures how pylint detects repetitive code # Configures how pylint detects repetitive code