|
|
|
@ -2,13 +2,13 @@
|
|
|
|
|
import copy
|
|
|
|
|
import os
|
|
|
|
|
import pathlib
|
|
|
|
|
from typing import Type
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
|
|
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
|
|
|
|
|
from data_pipeline.tests.conftest import copy_data_files
|
|
|
|
|
from data_pipeline.tests.sources.example.etl import ExampleETL
|
|
|
|
|
from data_pipeline.utils import get_module_logger
|
|
|
|
|
|
|
|
|
@ -17,19 +17,30 @@ logger = get_module_logger(__name__)
|
|
|
|
|
|
|
|
|
|
class TestETL:
|
|
|
|
|
"""A base class that can be inherited by all other ETL tests.
|
|
|
|
|
|
|
|
|
|
Note: every method that does *not* need to be reimplemented by child classes has
|
|
|
|
|
the test name pattern of `test_*_base`. All other tests need to be reimplemented.
|
|
|
|
|
This uses pytest-snapshot.
|
|
|
|
|
|
|
|
|
|
To update individual snapshots: $ poetry run pytest
|
|
|
|
|
data_pipeline/tests/sources/national_risk_index/test_etl.py::TestClassNameETL::<testname>
|
|
|
|
|
--snapshot-update
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# In every child test class, change this to the class of the ETL being tested.
|
|
|
|
|
_ETL_CLASS = ExampleETL
|
|
|
|
|
|
|
|
|
|
# The following constants do not need to be updated in child class.
|
|
|
|
|
_INPUT_CSV_FILE_NAME = "input.csv"
|
|
|
|
|
_EXTRACT_CSV_FILE_NAME = "extract.csv"
|
|
|
|
|
_TRANSFORM_CSV_FILE_NAME = "transform.csv"
|
|
|
|
|
_OUTPUT_CSV_FILE_NAME = "output.csv"
|
|
|
|
|
|
|
|
|
|
# This *does* need to be updated in the child class. It specifies where the "sample data" is
|
|
|
|
|
# so that we do not have to manually copy the "sample data" when we run the tests.
|
|
|
|
|
_SAMPLE_DATA_PATH = pathlib.Path(__file__).parents[0] / "data"
|
|
|
|
|
_SAMPLE_DATA_FILE_NAME = "input.csv"
|
|
|
|
|
_SAMPLE_DATA_ZIP_FILE_NAME = "input.zip"
|
|
|
|
|
_EXTRACT_TMP_FOLDER_NAME = "ExampleETL"
|
|
|
|
|
|
|
|
|
|
# Note: We used shared census tract IDs so that later our tests can join all the
|
|
|
|
|
# ETL results together and generate a test score. This join is only possible if
|
|
|
|
|
# we use the same tract IDs across fixtures.
|
|
|
|
@ -56,19 +67,14 @@ class TestETL:
|
|
|
|
|
|
|
|
|
|
def setup_method(self, _method, filename=__file__):
|
|
|
|
|
"""Before every test, set the data directory for the test.
|
|
|
|
|
|
|
|
|
|
Uses the directory of the test class to infer the data directory.
|
|
|
|
|
|
|
|
|
|
pytest does not support classes with an `__init__`. Instead, we use this
|
|
|
|
|
`setup_method` which pytest will run before every test method is run.
|
|
|
|
|
|
|
|
|
|
For now, all child classes inheriting this need to reimplement this, but can
|
|
|
|
|
use the same line of code regardless of the child class:
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
def setup_method(self, _method, filename=__file__):
|
|
|
|
|
'''Invoke `setup_method` from Parent, but using the current file name
|
|
|
|
|
|
|
|
|
|
This code can be copied identically between all child classes.
|
|
|
|
|
'''
|
|
|
|
|
super().setup_method(_method=_method, filename=filename)
|
|
|
|
@ -76,18 +82,17 @@ class TestETL:
|
|
|
|
|
"""
|
|
|
|
|
self._DATA_DIRECTORY_FOR_TEST = pathlib.Path(filename).parent / "data"
|
|
|
|
|
|
|
|
|
|
def _get_instance_of_etl_class(self) -> type(ExtractTransformLoad):
|
|
|
|
|
def _get_instance_of_etl_class(self) -> Type[ExtractTransformLoad]:
|
|
|
|
|
return self._ETL_CLASS()
|
|
|
|
|
|
|
|
|
|
def _setup_etl_instance_and_run_extract(
|
|
|
|
|
self, mock_etl, mock_paths
|
|
|
|
|
) -> ExtractTransformLoad:
|
|
|
|
|
"""Method to setup an ETL instance with proper upstream mocks to run extract.
|
|
|
|
|
|
|
|
|
|
This must be re-implemented in every child class.
|
|
|
|
|
|
|
|
|
|
This method can be used by multiple tests that need to run the same fixtures
|
|
|
|
|
that need these same mocks, and by `test_update_test_fixtures`.
|
|
|
|
|
that need these same mocks.
|
|
|
|
|
|
|
|
|
|
In order to re-implement this method, usually it will involve a
|
|
|
|
|
decent amount of work to monkeypatch `requests` or another method that's
|
|
|
|
@ -107,22 +112,8 @@ class TestETL:
|
|
|
|
|
|
|
|
|
|
return etl
|
|
|
|
|
|
|
|
|
|
def test_existence_of_test_fixtures_base(self):
|
|
|
|
|
"""Every ETL test should have these two test fixture files.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
assert (
|
|
|
|
|
self._DATA_DIRECTORY_FOR_TEST / self._TRANSFORM_CSV_FILE_NAME
|
|
|
|
|
).exists()
|
|
|
|
|
|
|
|
|
|
assert (
|
|
|
|
|
self._DATA_DIRECTORY_FOR_TEST / self._OUTPUT_CSV_FILE_NAME
|
|
|
|
|
).exists()
|
|
|
|
|
|
|
|
|
|
def test_init_base(self, mock_etl, mock_paths):
|
|
|
|
|
"""Test whether class has appropriate parameters set.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
# Setup
|
|
|
|
@ -150,8 +141,8 @@ class TestETL:
|
|
|
|
|
|
|
|
|
|
def test_get_output_file_path_base(self, mock_etl, mock_paths):
|
|
|
|
|
"""Test file path method.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
Can be run without modification for all child classes,
|
|
|
|
|
except those that do not produce usa.csv files.
|
|
|
|
|
"""
|
|
|
|
|
etl = self._get_instance_of_etl_class()
|
|
|
|
|
data_path, tmp_path = mock_paths
|
|
|
|
@ -171,11 +162,9 @@ class TestETL:
|
|
|
|
|
|
|
|
|
|
def test_fixtures_contain_shared_tract_ids_base(self, mock_etl, mock_paths):
|
|
|
|
|
"""Check presence of necessary shared tract IDs.
|
|
|
|
|
|
|
|
|
|
Note: We used shared census tract IDs so that later our tests can join all the
|
|
|
|
|
ETL results together and generate a test score. This join is only possible if
|
|
|
|
|
we use the same tract IDs across fixtures.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
etl = self._setup_etl_instance_and_run_extract(
|
|
|
|
@ -198,9 +187,68 @@ class TestETL:
|
|
|
|
|
else:
|
|
|
|
|
raise NotImplementedError("This geo level not tested yet.")
|
|
|
|
|
|
|
|
|
|
def test_sample_data_exists(self):
|
|
|
|
|
"""This will test that the sample data exists where it's supposed to as it's supposed to
|
|
|
|
|
As per conversation with Jorge, here we can *just* test that the zip file exists.
|
|
|
|
|
"""
|
|
|
|
|
assert (
|
|
|
|
|
self._SAMPLE_DATA_PATH / self._SAMPLE_DATA_ZIP_FILE_NAME
|
|
|
|
|
).exists()
|
|
|
|
|
|
|
|
|
|
def test_extract_unzips_base(self, mock_etl, mock_paths):
|
|
|
|
|
"""Tests the extract method.
|
|
|
|
|
|
|
|
|
|
As per conversation with Jorge, no longer includes snapshot. Instead, verifies that the
|
|
|
|
|
file was unzipped from a "fake" downloaded zip (located in data) in a temporary path.
|
|
|
|
|
"""
|
|
|
|
|
tmp_path = mock_paths[1]
|
|
|
|
|
|
|
|
|
|
_ = self._setup_etl_instance_and_run_extract(
|
|
|
|
|
mock_etl=mock_etl,
|
|
|
|
|
mock_paths=mock_paths,
|
|
|
|
|
)
|
|
|
|
|
assert (
|
|
|
|
|
tmp_path
|
|
|
|
|
/ self._EXTRACT_TMP_FOLDER_NAME
|
|
|
|
|
/ self._SAMPLE_DATA_FILE_NAME
|
|
|
|
|
).exists()
|
|
|
|
|
|
|
|
|
|
def test_extract_produces_valid_data(self, snapshot, mock_etl, mock_paths):
|
|
|
|
|
"""Tests the extract method.
|
|
|
|
|
|
|
|
|
|
Here we are verifying that the data that we extract is "readable". I added a snapshot to be thorough,
|
|
|
|
|
but @Jorge -- do you think this is necessary?
|
|
|
|
|
"""
|
|
|
|
|
etl = self._setup_etl_instance_and_run_extract(
|
|
|
|
|
mock_etl=mock_etl,
|
|
|
|
|
mock_paths=mock_paths,
|
|
|
|
|
)
|
|
|
|
|
tmp_df = pd.read_csv(
|
|
|
|
|
etl.get_tmp_path() / self._SAMPLE_DATA_FILE_NAME,
|
|
|
|
|
dtype={etl.GEOID_TRACT_FIELD_NAME: str},
|
|
|
|
|
)
|
|
|
|
|
snapshot.snapshot_dir = self._DATA_DIRECTORY_FOR_TEST
|
|
|
|
|
snapshot.assert_match(
|
|
|
|
|
tmp_df.to_csv(index=False, float_format="%.5f"),
|
|
|
|
|
self._EXTRACT_CSV_FILE_NAME,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def test_transform_base(self, snapshot, mock_etl, mock_paths):
|
|
|
|
|
"""Tests the transform method.
|
|
|
|
|
|
|
|
|
|
This verifies that when we extract the data, we can then read it in"""
|
|
|
|
|
# setup - copy sample data into tmp_dir
|
|
|
|
|
etl = self._setup_etl_instance_and_run_extract(mock_etl, mock_paths)
|
|
|
|
|
etl.transform()
|
|
|
|
|
|
|
|
|
|
snapshot.snapshot_dir = self._DATA_DIRECTORY_FOR_TEST
|
|
|
|
|
snapshot.assert_match(
|
|
|
|
|
etl.output_df.to_csv(index=False, float_format="%.5f"),
|
|
|
|
|
self._TRANSFORM_CSV_FILE_NAME,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def test_transform_sets_output_df_base(self, mock_etl, mock_paths):
|
|
|
|
|
"""This test ensures that the transform step sets its results to `output_df`.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
etl = self._setup_etl_instance_and_run_extract(
|
|
|
|
@ -217,33 +265,8 @@ class TestETL:
|
|
|
|
|
for col in etl.COLUMNS_TO_KEEP:
|
|
|
|
|
assert col in etl.output_df.columns, f"{col} is missing from output"
|
|
|
|
|
|
|
|
|
|
def test_transform_base(self, mock_etl):
|
|
|
|
|
"""Tests the transform method.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
# setup - copy sample data into tmp_dir
|
|
|
|
|
etl = self._get_instance_of_etl_class()
|
|
|
|
|
etl.transform()
|
|
|
|
|
|
|
|
|
|
transform_csv_path = (
|
|
|
|
|
self._DATA_DIRECTORY_FOR_TEST / self._TRANSFORM_CSV_FILE_NAME
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Compare to expected.
|
|
|
|
|
expected = pd.read_csv(
|
|
|
|
|
filepath_or_buffer=transform_csv_path,
|
|
|
|
|
dtype={
|
|
|
|
|
ExtractTransformLoad.GEOID_TRACT_FIELD_NAME: "string",
|
|
|
|
|
ExtractTransformLoad.GEOID_FIELD_NAME: "string",
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
pd.testing.assert_frame_equal(etl.output_df, expected)
|
|
|
|
|
|
|
|
|
|
def test_load_base(self, mock_etl):
|
|
|
|
|
def test_load_base(self, snapshot, mock_etl, mock_paths):
|
|
|
|
|
"""Test load method.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
# setup - input variables
|
|
|
|
@ -263,29 +286,23 @@ class TestETL:
|
|
|
|
|
actual_output_path = etl._get_output_file_path()
|
|
|
|
|
assert actual_output_path.exists()
|
|
|
|
|
|
|
|
|
|
# Check COLUMNS_TO_KEEP remain
|
|
|
|
|
actual_output = pd.read_csv(
|
|
|
|
|
actual_output_path, dtype={etl.GEOID_TRACT_FIELD_NAME: str}
|
|
|
|
|
)
|
|
|
|
|
expected_output_csv_path = (
|
|
|
|
|
self._DATA_DIRECTORY_FOR_TEST / self._OUTPUT_CSV_FILE_NAME
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# setup - load expected output
|
|
|
|
|
expected_output = pd.read_csv(
|
|
|
|
|
filepath_or_buffer=expected_output_csv_path,
|
|
|
|
|
dtype={etl.GEOID_TRACT_FIELD_NAME: str},
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# check that the `COLUMNS_TO_KEEP` are in the output
|
|
|
|
|
for col in etl.COLUMNS_TO_KEEP:
|
|
|
|
|
assert col in actual_output.columns, f"{col} is missing from output"
|
|
|
|
|
|
|
|
|
|
# validation
|
|
|
|
|
pd.testing.assert_frame_equal(actual_output, expected_output)
|
|
|
|
|
# Check the snapshots
|
|
|
|
|
snapshot.snapshot_dir = self._DATA_DIRECTORY_FOR_TEST
|
|
|
|
|
snapshot.assert_match(
|
|
|
|
|
actual_output.to_csv(index=False, float_format="%.5f"),
|
|
|
|
|
self._OUTPUT_CSV_FILE_NAME,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def test_validate_base(self, mock_etl, mock_paths):
|
|
|
|
|
"""Every ETL class should have proper validation.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
etl = self._setup_etl_instance_and_run_extract(
|
|
|
|
@ -436,9 +453,7 @@ class TestETL:
|
|
|
|
|
|
|
|
|
|
def test_full_etl_base(self, mock_etl, mock_paths):
|
|
|
|
|
"""Every ETL class should be able to run end-to-end.
|
|
|
|
|
|
|
|
|
|
Run extract, transform, validate, load, and get without error.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
etl = self._setup_etl_instance_and_run_extract(
|
|
|
|
@ -451,7 +466,6 @@ class TestETL:
|
|
|
|
|
|
|
|
|
|
def test_get_data_frame_base(self, mock_etl, mock_paths):
|
|
|
|
|
"""Every ETL class should be able to return its data frame.
|
|
|
|
|
|
|
|
|
|
Can be run without modification for all child classes.
|
|
|
|
|
"""
|
|
|
|
|
etl = self._setup_etl_instance_and_run_extract(
|
|
|
|
@ -500,71 +514,3 @@ class TestETL:
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise NotImplementedError("This geo level not tested yet.")
|
|
|
|
|
|
|
|
|
|
# This decorator means that this "test" will only be run by passing that flag to
|
|
|
|
|
# pytest, for instance by running `pytest . -rsx --update_snapshots`.
|
|
|
|
|
@pytest.mark.update_snapshots
|
|
|
|
|
def test_update_test_fixtures(self, mock_etl, mock_paths):
|
|
|
|
|
"""Update the test fixtures (the data files) used by the test.
|
|
|
|
|
|
|
|
|
|
This needs to be reimplemented for every child class. This is because there
|
|
|
|
|
are not strict contracts on the outputs of the `extract` step so this method
|
|
|
|
|
needs to explicitly define how to update the `input` fixture that comes after
|
|
|
|
|
the extract step.
|
|
|
|
|
|
|
|
|
|
Using this method to update fixtures can be helpful if you expect the
|
|
|
|
|
results to change because you changed the logic of the ETL class and need to
|
|
|
|
|
quickly update the fixtures.
|
|
|
|
|
|
|
|
|
|
However, note a few things first:
|
|
|
|
|
|
|
|
|
|
1. Do *not* update these fixtures if you did not expect the ETL results to
|
|
|
|
|
change!
|
|
|
|
|
|
|
|
|
|
2. If the source data itself changes (e.g., the external source renames a
|
|
|
|
|
column), update the "furthest upstream" test fixture which, in many cases,
|
|
|
|
|
is a .zip file. Then running this method will update all subsequent files.
|
|
|
|
|
|
|
|
|
|
If you're confused by any of this, ask for help, it's confusing :).
|
|
|
|
|
"""
|
|
|
|
|
# When running this in child classes, make sure the child class re-implements
|
|
|
|
|
# this method.
|
|
|
|
|
if self._ETL_CLASS is not ExampleETL:
|
|
|
|
|
raise NotImplementedError(
|
|
|
|
|
"Update fixtures method not defined for this class."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# The rest of this method applies for `ExampleETL` only.
|
|
|
|
|
etl = self._setup_etl_instance_and_run_extract(
|
|
|
|
|
mock_etl=mock_etl, mock_paths=mock_paths
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# After running extract, write the results as the "input.csv" in the test
|
|
|
|
|
# directory.
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Writing data to {self._DATA_DIRECTORY_FOR_TEST / self._INPUT_CSV_FILE_NAME}"
|
|
|
|
|
)
|
|
|
|
|
copy_data_files(
|
|
|
|
|
src=etl.get_tmp_path() / "input.csv",
|
|
|
|
|
dst=self._DATA_DIRECTORY_FOR_TEST / self._INPUT_CSV_FILE_NAME,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# After running transform, write the results as the "transform.csv" in the test
|
|
|
|
|
# directory.
|
|
|
|
|
etl.transform()
|
|
|
|
|
etl.output_df.to_csv(
|
|
|
|
|
path_or_buf=self._DATA_DIRECTORY_FOR_TEST
|
|
|
|
|
/ self._TRANSFORM_CSV_FILE_NAME,
|
|
|
|
|
index=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Run validate, just to check.
|
|
|
|
|
etl.validate()
|
|
|
|
|
|
|
|
|
|
# After running load, write the results as the "output.csv" in the test
|
|
|
|
|
# directory.
|
|
|
|
|
etl.load()
|
|
|
|
|
copy_data_files(
|
|
|
|
|
src=etl._get_output_file_path(),
|
|
|
|
|
dst=self._DATA_DIRECTORY_FOR_TEST / self._OUTPUT_CSV_FILE_NAME,
|
|
|
|
|
)
|
|
|
|
|