mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-23 10:04:18 -08:00
* Adds dev dependencies to requirements.txt and re-runs black on codebase * Adds test and code for national risk index etl, still in progress * Removes test_data from .gitignore * Adds test data to nation_risk_index tests * Creates tests and ETL class for NRI data * Adds tests for load() and transform() methods of NationalRiskIndexETL * Updates README.md with info about the NRI dataset * Adds to dos * Moves tests and test data into a tests/ dir in national_risk_index * Moves tmp_dir for tests into data/tmp/tests/ * Promotes fixtures to conftest and relocates national_risk_index tests: The relocation of national_risk_index tests is necessary because tests can only use fixtures specified in conftests within the same package * Fixes issue with df.equals() in test_transform() * Files reformatted by black * Commit changes to other files after re-running black * Fixes unused import that caused lint checks to fail * Moves tests/ directory to app root for data_pipeline * Adds new methods to ExtractTransformLoad base class: - __init__() Initializes class attributes - _get_census_fips_codes() Loads a dataframe with the fips codes for census block group and tract - validate_init() Checks that the class was initialized correctly - validate_output() Checks that the output was loaded correctly * Adds test for ExtractTransformLoad.__init__() and base.py * Fixes failing flake8 test * Changes geo_col to geoid_col and changes is_dataset to is_census in yaml * Adds test for validate_output() * Adds remaining tests * Removes is_dataset from init method * Makes CENSUS_CSV a class attribute instead of a class global: This ensures that CENSUS_CSV is only set when the ETL class is for a non-census dataset and removes the need to overwrite the value in mock_etl fixture * Re-formats files with black and fixes broken tox tests
161 lines
5.4 KiB
Python
161 lines
5.4 KiB
Python
import shutil
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
import pytest
|
|
import pandas as pd
|
|
|
|
from data_pipeline.config import settings
|
|
from data_pipeline.etl.base import ExtractTransformLoad
|
|
|
|
TEST_DIR = settings.APP_ROOT / "tests" / "base"
|
|
DATA_DIR = TEST_DIR / "data"
|
|
CONFIG_PATH = TEST_DIR / "config.yaml"
|
|
OUTPUT_SRC = DATA_DIR / "output.csv"
|
|
|
|
|
|
def remove_output(etl):
|
|
"""Clears output.csv if it is exists"""
|
|
etl = TemplateETL(CONFIG_PATH)
|
|
if etl.OUTPUT_PATH.exists():
|
|
etl.OUTPUT_PATH.unlink()
|
|
assert etl.OUTPUT_PATH.exists() is False
|
|
|
|
|
|
def load_output_source(etl):
|
|
"""Loads output csv so that it can be modified"""
|
|
df = pd.read_csv(
|
|
OUTPUT_SRC,
|
|
dtype={
|
|
etl.GEOID_FIELD_NAME: "string",
|
|
etl.GEOID_TRACT_FIELD_NAME: "string",
|
|
},
|
|
)
|
|
return df
|
|
|
|
|
|
class TemplateETL(ExtractTransformLoad):
|
|
"""Mock ETL class that inherits from the base ETL"""
|
|
|
|
def __init__(self, config_path: Path) -> None:
|
|
super().__init__(config_path)
|
|
self.EXTRACTED_CSV: Path = DATA_DIR / "output.csv"
|
|
self.df: pd.DataFrame = None
|
|
|
|
|
|
class TestInit:
|
|
"""Tests the super.init() method in a class that inherits from
|
|
ExtractTransformLoad"""
|
|
|
|
def test_init(self, mock_paths, mock_etl):
|
|
"""Tests that the init method executes successfully
|
|
|
|
Validates the following conditions:
|
|
- The class was instantiated with no errors
|
|
- All of the class attributes were set correctly by _get_yaml_config()
|
|
"""
|
|
# setup
|
|
data_path, tmp_path = mock_paths
|
|
etl = TemplateETL(CONFIG_PATH)
|
|
# validation
|
|
assert etl.NAME == "Template"
|
|
assert etl.SOURCE_URL == "https://github.com/usds/justice40-tool/"
|
|
assert etl.GEOID_COL == "GEO COL"
|
|
assert etl.GEO_LEVEL == "Census Block Group"
|
|
assert etl.SCORE_COLS == ["COL 1", "COL 2", "COL 3"]
|
|
assert etl.OUTPUT_PATH == data_path / "dataset" / "template" / "usa.csv"
|
|
assert etl.CENSUS_CSV.exists()
|
|
|
|
def test_init_missing_config(self, mock_etl):
|
|
"""Tests that FileNotFoundError is raised when the class is instantiated
|
|
with a path to a config.yaml file that doesn't exist
|
|
"""
|
|
# setup
|
|
config_path = settings.APP_ROOT / "fake_path"
|
|
assert config_path.exists() is False
|
|
# execute
|
|
with pytest.raises(FileNotFoundError):
|
|
TemplateETL(config_path)
|
|
|
|
def test_init_bad_config(self, mock_etl):
|
|
"""Tests that YAMLError is raised when the class is instantiated with
|
|
a yaml file that has errors in it
|
|
"""
|
|
# setup
|
|
config_path = TEST_DIR / "invalid_config.yaml"
|
|
assert config_path.exists()
|
|
# execute
|
|
with pytest.raises(yaml.YAMLError):
|
|
TemplateETL(config_path)
|
|
|
|
|
|
class TestValidateOutput:
|
|
"""Tests the ExtractTransformLoad.validate_output() method"""
|
|
|
|
def test_validate_output_success(self, mock_etl):
|
|
"""Tests that validate_output() runs successfully with valid output"""
|
|
# setup - instantiate etl class
|
|
etl = TemplateETL(CONFIG_PATH)
|
|
# setup - load output file
|
|
shutil.copyfile(OUTPUT_SRC, etl.OUTPUT_PATH)
|
|
# validation
|
|
etl.validate_output()
|
|
|
|
def test_validate_output_missing_output(self, mock_etl):
|
|
"""Tests that validate_output() fails if the output isn't written to
|
|
the location at self.OUTPUT_PATH
|
|
"""
|
|
# setup - remove output file
|
|
etl = TemplateETL(CONFIG_PATH)
|
|
remove_output(etl)
|
|
# validation
|
|
with pytest.raises(AssertionError):
|
|
etl.validate_output()
|
|
|
|
def test_validate_missing_geoid_col(self, mock_etl):
|
|
"""Tests that validate_output() fails if the output is missing one of
|
|
census fips codes columns
|
|
"""
|
|
# setup - remove output file
|
|
etl = TemplateETL(CONFIG_PATH)
|
|
remove_output(etl)
|
|
# setup - delete GEOID10 col from output
|
|
df = load_output_source(etl)
|
|
df.drop(etl.GEOID_FIELD_NAME, axis=1, inplace=True)
|
|
assert etl.GEOID_FIELD_NAME not in df.columns
|
|
df.to_csv(etl.OUTPUT_PATH)
|
|
# validation
|
|
with pytest.raises(KeyError):
|
|
etl.validate_output()
|
|
|
|
def test_validate_missing_census_block_group(self, mock_etl):
|
|
"""Tests that validate_output() fails if the output is missing one of
|
|
census block group rows
|
|
"""
|
|
# setup - remove output file
|
|
etl = TemplateETL(CONFIG_PATH)
|
|
remove_output(etl)
|
|
# setup - remove the first Census Block Group
|
|
df = load_output_source(etl)
|
|
df.drop(index=df.index[0], axis=0, inplace=True) # delete row 1
|
|
assert len(df) == 9
|
|
df.to_csv(etl.OUTPUT_PATH)
|
|
# validation
|
|
with pytest.raises(AssertionError):
|
|
etl.validate_output()
|
|
|
|
def test_validate_missing_score_col(self, mock_etl):
|
|
"""Tests that validate_output() fails if the output is missing one of
|
|
the columns used in the score
|
|
"""
|
|
# setup - remove output file
|
|
etl = TemplateETL(CONFIG_PATH)
|
|
remove_output(etl)
|
|
# setup - delete one of the score columns
|
|
df = load_output_source(etl)
|
|
df.drop("COL 1", axis=1, inplace=True)
|
|
assert "COL 1" not in df.columns
|
|
df.to_csv(etl.OUTPUT_PATH)
|
|
# validation
|
|
with pytest.raises(AssertionError):
|
|
etl.validate_output()
|