Add tests to make sure each source makes it to the score correctly (#1878)

* Remove unused persistent poverty from score (#1835)

* Test a few datasets for overlap in the final score (#1835)

* Add remaining data sources (#1853)

* Apply code-review feedback (#1835)

* Rearrange a little for readabililty (#1835)

* Add tract test (#1835)

* Add test for score values (#1835)

* Check for unmatched source tracts (#1835)

* Cleanup numeric code to plaintext (#1835)

* Make import more obvious (#1835)
This commit is contained in:
Matt Bowen 2022-09-06 15:10:19 -04:00 committed by GitHub
commit d41153d89d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 328 additions and 18 deletions

View file

@ -42,7 +42,6 @@ class ScoreETL(ExtractTransformLoad):
self.doe_energy_burden_df: pd.DataFrame self.doe_energy_burden_df: pd.DataFrame
self.national_risk_index_df: pd.DataFrame self.national_risk_index_df: pd.DataFrame
self.geocorr_urban_rural_df: pd.DataFrame self.geocorr_urban_rural_df: pd.DataFrame
self.persistent_poverty_df: pd.DataFrame
self.census_decennial_df: pd.DataFrame self.census_decennial_df: pd.DataFrame
self.census_2010_df: pd.DataFrame self.census_2010_df: pd.DataFrame
self.national_tract_df: pd.DataFrame self.national_tract_df: pd.DataFrame
@ -159,16 +158,6 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False, low_memory=False,
) )
# Load persistent poverty
persistent_poverty_csv = (
constants.DATA_PATH / "dataset" / "persistent_poverty" / "usa.csv"
)
self.persistent_poverty_df = pd.read_csv(
persistent_poverty_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
# Load decennial census data # Load decennial census data
census_decennial_csv = ( census_decennial_csv = (
constants.DATA_PATH constants.DATA_PATH
@ -359,7 +348,6 @@ class ScoreETL(ExtractTransformLoad):
self.doe_energy_burden_df, self.doe_energy_burden_df,
self.ejscreen_df, self.ejscreen_df,
self.geocorr_urban_rural_df, self.geocorr_urban_rural_df,
self.persistent_poverty_df,
self.national_risk_index_df, self.national_risk_index_df,
self.census_acs_median_incomes_df, self.census_acs_median_incomes_df,
self.census_decennial_df, self.census_decennial_df,
@ -484,7 +472,6 @@ class ScoreETL(ExtractTransformLoad):
non_numeric_columns = [ non_numeric_columns = [
self.GEOID_TRACT_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME,
field_names.PERSISTENT_POVERTY_FIELD,
field_names.TRACT_ELIGIBLE_FOR_NONNATURAL_THRESHOLD, field_names.TRACT_ELIGIBLE_FOR_NONNATURAL_THRESHOLD,
field_names.AGRICULTURAL_VALUE_BOOL_FIELD, field_names.AGRICULTURAL_VALUE_BOOL_FIELD,
] ]

View file

@ -1,13 +1,217 @@
import pandas as pd import pandas as pd
import pytest import pytest
from data_pipeline.config import settings from data_pipeline.config import settings
from data_pipeline.score import field_names from data_pipeline.score.field_names import GEOID_TRACT_FIELD
from data_pipeline.etl.score import constants
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def final_score_df(): def final_score_df():
return pd.read_csv( return pd.read_csv(
settings.APP_ROOT / "data" / "score" / "csv" / "full" / "usa.csv", settings.APP_ROOT / "data" / "score" / "csv" / "full" / "usa.csv",
dtype={field_names.GEOID_TRACT_FIELD: str}, dtype={GEOID_TRACT_FIELD: str},
low_memory=False, low_memory=False,
) )
@pytest.fixture()
def census_df():
census_csv = constants.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
return pd.read_csv(
census_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def ejscreen_df():
ejscreen_csv = constants.DATA_PATH / "dataset" / "ejscreen" / "usa.csv"
return pd.read_csv(
ejscreen_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def hud_housing_df():
hud_housing_csv = (
constants.DATA_PATH / "dataset" / "hud_housing" / "usa.csv"
)
return pd.read_csv(
hud_housing_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def cdc_places_df():
cdc_places_csv = constants.DATA_PATH / "dataset" / "cdc_places" / "usa.csv"
return pd.read_csv(
cdc_places_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def census_acs_median_incomes_df():
census_acs_median_incomes_csv = (
constants.DATA_PATH
/ "dataset"
/ "census_acs_median_income_2019"
/ "usa.csv"
)
return pd.read_csv(
census_acs_median_incomes_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def cdc_life_expectancy_df():
cdc_life_expectancy_csv = (
constants.DATA_PATH / "dataset" / "cdc_life_expectancy" / "usa.csv"
)
return pd.read_csv(
cdc_life_expectancy_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def doe_energy_burden_df():
doe_energy_burden_csv = (
constants.DATA_PATH / "dataset" / "doe_energy_burden" / "usa.csv"
)
return pd.read_csv(
doe_energy_burden_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def national_risk_index_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "national_risk_index" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def dot_travel_disadvantage_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "travel_composite" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def fsf_fire_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "fsf_wildfire_risk" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def fsf_flood_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "fsf_flood_risk" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def nature_deprived_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "nlcd_nature_deprived" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def eamlis_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "eamlis" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def fuds_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "us_army_fuds" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def geocorr_urban_rural_df():
geocorr_urban_rural_csv = (
constants.DATA_PATH / "dataset" / "geocorr" / "usa.csv"
)
return pd.read_csv(
geocorr_urban_rural_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def census_decennial_df():
census_decennial_csv = (
constants.DATA_PATH / "dataset" / "census_decennial_2010" / "usa.csv"
)
return pd.read_csv(
census_decennial_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def census_2010_df():
census_2010_csv = (
constants.DATA_PATH / "dataset" / "census_acs_2010" / "usa.csv"
)
return pd.read_csv(
census_2010_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def hrs_df():
hrs_csv = constants.DATA_PATH / "dataset" / "historic_redlining" / "usa.csv"
return pd.read_csv(
hrs_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def national_tract_df():
national_tract_csv = constants.DATA_CENSUS_CSV_FILE_PATH
return pd.read_csv(
national_tract_csv,
names=[GEOID_TRACT_FIELD],
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
header=None,
)

View file

@ -28,7 +28,6 @@ class PercentileTestConfig:
return self.percentile_column_name return self.percentile_column_name
### TODO: we need to blow this out for all eight categories
def _check_percentile_against_threshold(df, config: PercentileTestConfig): def _check_percentile_against_threshold(df, config: PercentileTestConfig):
"""Note - for the purpose of testing, this fills with False""" """Note - for the purpose of testing, this fills with False"""
is_minimum_flagged_ok = ( is_minimum_flagged_ok = (

View file

@ -1,12 +1,37 @@
# flake8: noqa: W0613,W0611,F811 # flake8: noqa: W0613,W0611,F811,
# pylint: disable=unused-import,too-many-arguments
from dataclasses import dataclass from dataclasses import dataclass
from typing import List from typing import List
import pytest import pytest
import pandas as pd import pandas as pd
import numpy as np
from data_pipeline.score import field_names from data_pipeline.score import field_names
from .fixtures import final_score_df # pylint: disable=unused-import from data_pipeline.score.field_names import GEOID_TRACT_FIELD
from .fixtures import (
final_score_df,
ejscreen_df,
hud_housing_df,
census_df,
cdc_places_df,
census_acs_median_incomes_df,
cdc_life_expectancy_df,
doe_energy_burden_df,
national_risk_index_df,
dot_travel_disadvantage_df,
fsf_fire_df,
nature_deprived_df,
eamlis_df,
fuds_df,
geocorr_urban_rural_df,
census_decennial_df,
census_2010_df,
hrs_df,
national_tract_df,
)
pytestmark = pytest.mark.smoketest pytestmark = pytest.mark.smoketest
UNMATCHED_TRACK_THRESHOLD = 1000
def _helper_test_count_exceeding_threshold(df, col, error_check=1000): def _helper_test_count_exceeding_threshold(df, col, error_check=1000):
@ -203,3 +228,98 @@ def test_donut_hole_addition_to_score_n(final_score_df):
assert ( assert (
new_donuts > 0 new_donuts > 0
), "FYI: The adjacency index is doing nothing. Consider removing it?" ), "FYI: The adjacency index is doing nothing. Consider removing it?"
def test_data_sources(
final_score_df,
hud_housing_df,
ejscreen_df,
census_df,
cdc_places_df,
census_acs_median_incomes_df,
cdc_life_expectancy_df,
doe_energy_burden_df,
national_risk_index_df,
dot_travel_disadvantage_df,
fsf_fire_df,
nature_deprived_df,
eamlis_df,
fuds_df,
geocorr_urban_rural_df,
census_decennial_df,
census_2010_df,
hrs_df,
):
data_sources = {
key: value for key, value in locals().items() if key != "final_score_df"
}
for data_source_name, data_source in data_sources.items():
final = "final_"
df: pd.DataFrame = final_score_df.merge(
data_source,
on=GEOID_TRACT_FIELD,
indicator="MERGE",
suffixes=(final, f"_{data_source_name}"),
how="outer",
)
# Make our lists of columns for later comparison
core_cols = data_source.columns.intersection(
final_score_df.columns
).drop(GEOID_TRACT_FIELD)
data_source_columns = [f"{col}_{data_source_name}" for col in core_cols]
final_columns = [f"{col}{final}" for col in core_cols]
assert (
final_columns
), f"No columns from data source show up in final score in source {data_source_name}"
# Make sure we have NAs for any tracts in the final data that aren't
# covered in the final data
assert np.all(df[df.MERGE == "left_only"][final_columns].isna())
# Make sure the datasource doesn't have a ton of unmatched tracts, implying it
# has moved to 2020 tracts
assert len(df[df.MERGE == "right_only"]) < UNMATCHED_TRACK_THRESHOLD
df = df[df.MERGE == "both"]
# Compare every column for equality, using close equality for numerics and
# `equals` equality for non-numeric columns
for final_column, data_source_column in zip(
data_source_columns, final_columns
):
error_message = (
f"Column {final_column} not equal "
f"between {data_source_name} and final score"
)
if df[final_column].dtype in [
np.dtype(object),
np.dtype(bool),
np.dtype(str),
]:
assert df[final_column].equals(
df[data_source_column]
), error_message
else:
assert np.allclose(
df[final_column],
df[data_source_column],
equal_nan=True,
), error_message
def test_output_tracts(final_score_df, national_tract_df):
df = final_score_df.merge(
national_tract_df,
on=GEOID_TRACT_FIELD,
how="outer",
indicator="MERGE",
)
counts = df.value_counts("MERGE")
assert counts.loc["left_only"] == 0
assert counts.loc["right_only"] == 0
def test_all_tracts_have_scores(final_score_df):
assert not final_score_df[field_names.SCORE_N_COMMUNITIES].isna().any()