j40-cejst-2/data/data-pipeline/data_pipeline/tests/sources/eamlis/test_etl.py
Matt Bowen 8e5ed5b593
Add demos for island areas (#1932)
* Backfill population in island areas (#1882)

* Update smoketest to account for backfills (#1882)

As I wrote in the commend:
We backfill island areas with data from the 2010 census, so if THOSE tracts
have data beyond the data source, that's to be expected and is fine to pass.
If some other state or territory does though, this should fail

This ends up being a nice way of documenting that behavior i guess!

* Fixup lint issues (#1882)

* Add in race demos to 2010 census pull (#1851)

* Add backfill data to score (#1851)

* Change column name (#1851)

* Fill demos after the score (#1851)

* Add income back, adjust test (#1882)

* Apply code-review feedback (#1851)

* Add test for island area backfill (#1851)

* Fix bad rename (#1851)
2022-09-29 12:42:56 -04:00

159 lines
6.2 KiB
Python

# pylint: disable=protected-access
from unittest import mock
import pathlib
from data_pipeline.etl.base import ValidGeoLevel
from data_pipeline.etl.sources.eamlis.etl import (
AbandonedMineETL,
)
from data_pipeline.tests.sources.example.test_etl import TestETL
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
def _fake_add_tracts_for_geometries(df):
"""The actual geojoin is too slow for tests. Use precomputed results."""
lookups = {
(-117.1177285688382, 36.25161281807095): "06027000800",
(-121.0070599015156, 36.5498780497345): "06069000802",
(-121.40564726784282, 38.84602113669345): "06061021322",
(-155.10321769858746, 19.49784370888389): "15001021010",
(-154.89548634140738, 19.446650238354696): "15001021101",
(-159.43665201302525, 21.9044122609682): "15007040603",
(-159.52362041178708, 21.94208315793464): "15007040700",
(-156.14177664396527, 20.72796381691298): "15009030100",
(-156.2497797752935, 20.86486713282688): "15009030201",
(-155.91378867633992, 19.516629328900667): "15001021402",
(-155.81110884967674, 20.164406070883054): "15001021800",
(-156.33064622489087, 20.825369670478302): "15009030402",
(-156.54289869319305, 20.9170439162332): "15009030800",
(-157.89225964427064, 21.556464980367483): "15003010201",
(-159.48416846823164, 21.90754283544759): "15007040604",
}
df["GEOID10_TRACT"] = df.geometry.apply(
lambda point: lookups[(point.x, point.y)]
)
return df
class TestAbandondedLandMineETL(TestETL):
"""Tests the Abandoned Mine Dataset ETL
This uses pytest-snapshot.
To update individual snapshots: $ poetry run pytest
data_pipeline/tests/sources/eamlis/test_etl.py::TestClassNameETL::<testname>
--snapshot-update
"""
_ETL_CLASS = AbandonedMineETL
_SAMPLE_DATA_PATH = pathlib.Path(__file__).parents[0] / "data"
_SAMPLE_DATA_FILE_NAME = "eAMLIS export of all data.tsv"
_SAMPLE_DATA_ZIP_FILE_NAME = "eAMLIS export of all data.tsv.zip"
_EXTRACT_TMP_FOLDER_NAME = "AbandonedMineETL"
def setup_method(self, _method, filename=__file__):
"""Invoke `setup_method` from Parent, but using the current file name.
This code can be copied identically between all child classes.
"""
super().setup_method(_method=_method, filename=filename)
def test_init(self, mock_etl, mock_paths):
"""Tests that the mock class instance was
initiliazed correctly.
"""
# setup
etl = self._get_instance_of_etl_class()
# validation
assert etl.GEOID_FIELD_NAME == "GEOID10"
assert etl.GEOID_TRACT_FIELD_NAME == "GEOID10_TRACT"
assert etl.NAME == "eamlis"
assert etl.GEO_LEVEL == ValidGeoLevel.CENSUS_TRACT
assert etl.COLUMNS_TO_KEEP == [
etl.GEOID_TRACT_FIELD_NAME,
etl.AML_BOOLEAN,
]
def test_get_output_file_path(self, mock_etl, mock_paths):
"""Tests the right file name is returned."""
etl = self._get_instance_of_etl_class()
data_path, tmp_path = mock_paths
output_file_path = etl._get_output_file_path()
expected_output_file_path = (
data_path / "dataset" / self._ETL_CLASS.NAME / "usa.csv"
)
assert output_file_path == expected_output_file_path
def test_fixtures_contain_shared_tract_ids_base(self, mock_etl, mock_paths):
with mock.patch(
"data_pipeline.etl.sources.eamlis.etl.add_tracts_for_geometries",
new=_fake_add_tracts_for_geometries,
):
return super().test_fixtures_contain_shared_tract_ids_base(
mock_etl, mock_paths
)
def test_transform_base(self, snapshot, mock_etl, mock_paths):
with mock.patch(
"data_pipeline.etl.sources.eamlis.etl.add_tracts_for_geometries",
new=_fake_add_tracts_for_geometries,
):
super().test_transform_base(
snapshot=snapshot, mock_etl=mock_etl, mock_paths=mock_paths
)
def test_transform_sets_output_df_base(self, mock_etl, mock_paths):
with mock.patch(
"data_pipeline.etl.sources.eamlis.etl.add_tracts_for_geometries",
new=_fake_add_tracts_for_geometries,
):
super().test_transform_sets_output_df_base(
mock_etl=mock_etl, mock_paths=mock_paths
)
def test_validate_base(self, mock_etl, mock_paths):
with mock.patch(
"data_pipeline.etl.sources.eamlis.etl.add_tracts_for_geometries",
new=_fake_add_tracts_for_geometries,
):
super().test_validate_base(mock_etl=mock_etl, mock_paths=mock_paths)
def test_full_etl_base(self, mock_etl, mock_paths):
with mock.patch(
"data_pipeline.etl.sources.eamlis.etl.add_tracts_for_geometries",
new=_fake_add_tracts_for_geometries,
):
return super().test_full_etl_base(mock_etl, mock_paths)
def test_get_data_frame_base(self, mock_etl, mock_paths):
with mock.patch(
"data_pipeline.etl.sources.eamlis.etl.add_tracts_for_geometries",
new=_fake_add_tracts_for_geometries,
):
return super().test_get_data_frame_base(mock_etl, mock_paths)
def test_tracts_without_fuds_not_in_results(self, mock_etl, mock_paths):
with mock.patch(
"data_pipeline.etl.sources.eamlis.etl.add_tracts_for_geometries",
new=_fake_add_tracts_for_geometries,
):
etl = self._setup_etl_instance_and_run_extract(
mock_etl=mock_etl, mock_paths=mock_paths
)
etl.transform()
etl.validate()
etl.load()
df = etl.get_data_frame()
assert len(df[etl.GEOID_TRACT_FIELD_NAME]) == len(
self._FIXTURES_SHARED_TRACT_IDS
)
def test_tract_id_lengths(self, mock_etl, mock_paths):
with mock.patch(
"data_pipeline.etl.sources.eamlis.etl.add_tracts_for_geometries",
new=_fake_add_tracts_for_geometries,
):
super().test_tract_id_lengths(mock_etl, mock_paths)