Pipeline tile tests (#1864)

* temp update

* updating with fips check

* adding check on pfs

* updating with pfs test

* Update test_tiles_smoketests.py

* Fix lint errors (#1848)

* Add column names test (#1848)

* Mark tests as smoketests (#1848)

* Move to other score-related tests (#1848)

* Recast Total threshold criteria exceeded to int (#1848)

In writing tests to verify the output of the tiles csv matches the final
score CSV, I noticed TC/Total threshold criteria exceeded was getting
cast from an int64 to a float64 in the process of PostScoreETL. I
tracked it down to the line where we merge the score dataframe with
constants.DATA_CENSUS_CSV_FILE_PATH --- there where > 100 tracts in the
national census CSV that don't exist in the score, so those ended up
with a Total threshhold count of np.nan, which is a float, and thereby
cast those columns to float. For the moment I just cast it back.

* No need for low memeory (#1848)

* Add additional tests of tiles.csv (#1848)

* Drop pre-2010 rows before computing score (#1848)

Note this is probably NOT the optimal place for this change; it might
make more sense for each source to filter its own tracts down to the
acceptable tract list. However, that would be a pretty invasive change,
where this is central and plenty of other things are happening in score
transform that could be moved to sources, so for today, here's where the
change will live.

* Fix typo (#1848)

* Switch from filter to inner join (#1848)

* Remove no-op lines from tiles (#1848)

* Apply feedback from review, linter (#1848)

* Check the values oeverything in the frame (#1848)

* Refactor checker class (#1848)

* Add test for state names (#1848)

* cleanup from reviewing my own code (#1848)

* Fix lint error (#1858)

* Apply Emma's feedback from review (#1848)

* Remove refs to national_df (#1848)

* Account for new, fake nullable bools in tiles (#1848)

To handle a geojson limitation, Emma converted some nullable boolean
colunms to float64 in the tiles export with the values {0.0, 1.0, nan},
giving us the same expressiveness. Sadly, this broke my assumption that
all columns between the score and tiles csvs would have the same dtypes,
so I need to account for these new, fake bools in my test.

* Use equals instead of my worse version (#1848)

* Missed a spot where we called _create_score_data (#1848)

* Update per safety (#1848)

Co-authored-by: matt bowen <matthew.r.bowen@omb.eop.gov>
This commit is contained in:
Emma Nechamkin 2022-09-01 13:07:14 -04:00 committed by GitHub
parent ccd72e2cce
commit 9c0e1993f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 533 additions and 445 deletions

View file

@ -397,7 +397,7 @@ TILES_SCORE_FLOAT_COLUMNS = [
# Geojson cannot support nulls in a boolean column when we create tiles;
# to preserve null character, we coerce to floats for all fields
# that use null to signify missing information in a boolean field.
field_names.ELIGIBLE_FUDS_BINARY_FIELD_NAME,
field_names.AML_BOOLEAN,
field_names.ELIGIBLE_FUDS_BINARY_FIELD_NAME,
field_names.AML_BOOLEAN,
field_names.HISTORIC_REDLINING_SCORE_EXCEEDED
]

View file

@ -45,7 +45,7 @@ class ScoreETL(ExtractTransformLoad):
self.persistent_poverty_df: pd.DataFrame
self.census_decennial_df: pd.DataFrame
self.census_2010_df: pd.DataFrame
# self.child_opportunity_index_df: pd.DataFrame
self.national_tract_df: pd.DataFrame
self.hrs_df: pd.DataFrame
self.dot_travel_disadvantage_df: pd.DataFrame
self.fsf_flood_df: pd.DataFrame
@ -203,6 +203,15 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False,
)
national_tract_csv = constants.DATA_CENSUS_CSV_FILE_PATH
self.national_tract_df = pd.read_csv(
national_tract_csv,
names=[self.GEOID_TRACT_FIELD_NAME],
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
header=None,
)
def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame:
logger.info("Joining Census Tract dataframes")
@ -370,8 +379,21 @@ class ScoreETL(ExtractTransformLoad):
census_tract_df = self._join_tract_dfs(census_tract_dfs)
# If GEOID10s are read as numbers instead of strings, the initial 0 is dropped,
# and then we get too many CBG rows (one for 012345 and one for 12345).
# Drop tracts that don't exist in the 2010 tracts
pre_join_len = census_tract_df[field_names.GEOID_TRACT_FIELD].nunique()
census_tract_df = census_tract_df.merge(
self.national_tract_df,
on="GEOID10_TRACT",
how="inner",
)
assert (
census_tract_df.shape[0] <= pre_join_len
), "Join against national tract list ADDED rows"
logger.info(
"Dropped %s tracts not in the 2010 tract data",
pre_join_len - census_tract_df[field_names.GEOID_TRACT_FIELD].nunique()
)
# Now sanity-check the merged df.
self._census_tract_df_sanity_check(

View file

@ -45,7 +45,6 @@ class PostScoreETL(ExtractTransformLoad):
self.input_counties_df: pd.DataFrame
self.input_states_df: pd.DataFrame
self.input_score_df: pd.DataFrame
self.input_national_tract_df: pd.DataFrame
self.output_score_county_state_merged_df: pd.DataFrame
self.output_score_tiles_df: pd.DataFrame
@ -92,7 +91,9 @@ class PostScoreETL(ExtractTransformLoad):
def _extract_score(self, score_path: Path) -> pd.DataFrame:
logger.info("Reading Score CSV")
df = pd.read_csv(
score_path, dtype={self.GEOID_TRACT_FIELD_NAME: "string"}
score_path,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
# Convert total population to an int
@ -102,18 +103,6 @@ class PostScoreETL(ExtractTransformLoad):
return df
def _extract_national_tract(
self, national_tract_path: Path
) -> pd.DataFrame:
logger.info("Reading national tract file")
return pd.read_csv(
national_tract_path,
names=[self.GEOID_TRACT_FIELD_NAME],
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
header=None,
)
def extract(self) -> None:
logger.info("Starting Extraction")
@ -136,9 +125,6 @@ class PostScoreETL(ExtractTransformLoad):
self.input_score_df = self._extract_score(
constants.DATA_SCORE_CSV_FULL_FILE_PATH
)
self.input_national_tract_df = self._extract_national_tract(
constants.DATA_CENSUS_CSV_FILE_PATH
)
def _transform_counties(
self, initial_counties_df: pd.DataFrame
@ -185,7 +171,6 @@ class PostScoreETL(ExtractTransformLoad):
def _create_score_data(
self,
national_tract_df: pd.DataFrame,
counties_df: pd.DataFrame,
states_df: pd.DataFrame,
score_df: pd.DataFrame,
@ -217,28 +202,11 @@ class PostScoreETL(ExtractTransformLoad):
right_on=self.STATE_CODE_COLUMN,
how="left",
)
# check if there are census tracts without score
logger.info("Removing tract rows without score")
# merge census tracts with score
merged_df = national_tract_df.merge(
score_county_state_merged,
on=self.GEOID_TRACT_FIELD_NAME,
how="left",
)
# recast population to integer
score_county_state_merged["Total population"] = (
merged_df["Total population"].fillna(0).astype(int)
)
de_duplicated_df = merged_df.dropna(
subset=[DISADVANTAGED_COMMUNITIES_FIELD]
)
assert score_county_merged[
self.GEOID_TRACT_FIELD_NAME
].is_unique, "Merging state/county data introduced duplicate rows"
# set the score to the new df
return de_duplicated_df
return score_county_state_merged
def _create_tile_data(
self,
@ -427,7 +395,6 @@ class PostScoreETL(ExtractTransformLoad):
transformed_score = self._transform_score(self.input_score_df)
output_score_county_state_merged_df = self._create_score_data(
self.input_national_tract_df,
transformed_counties,
transformed_states,
transformed_score,

View file

@ -67,14 +67,12 @@ def test_transform_score(etl, score_data_initial, score_transformed_expected):
# pylint: disable=too-many-arguments
def test_create_score_data(
etl,
national_tract_df,
counties_transformed_expected,
states_transformed_expected,
score_transformed_expected,
score_data_expected,
):
score_data_actual = etl._create_score_data(
national_tract_df,
counties_transformed_expected,
states_transformed_expected,
score_transformed_expected,

View file

@ -9,4 +9,5 @@ def final_score_df():
return pd.read_csv(
settings.APP_ROOT / "data" / "score" / "csv" / "full" / "usa.csv",
dtype={field_names.GEOID_TRACT_FIELD: str},
low_memory=False,
)

View file

@ -0,0 +1,221 @@
# flake8: noqa: W0613,W0611,F811
from dataclasses import dataclass
from typing import Optional
import pandas as pd
import numpy as np
import pytest
from data_pipeline.config import settings
from data_pipeline.etl.score import constants
from data_pipeline.score import field_names
from data_pipeline.etl.score.constants import (
TILES_SCORE_COLUMNS,
THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
USER_INTERFACE_EXPERIENCE_FIELD_NAME,
)
from .fixtures import final_score_df # pylint: disable=unused-import
pytestmark = pytest.mark.smoketest
@pytest.fixture
def tiles_df(scope="session"):
return pd.read_csv(
settings.APP_ROOT / "data" / "score" / "csv" / "tiles" / "usa.csv",
dtype={"GTF": str},
low_memory=False,
)
PERCENTILE_FIELDS = [
"DF_PFS",
"AF_PFS",
"HDF_PFS",
"DSF_PFS",
"EBF_PFS",
"EALR_PFS",
"EBLR_PFS",
"EPLR_PFS",
"HBF_PFS",
"LLEF_PFS",
"LIF_PFS",
"LMI_PFS",
"MHVF_PFS",
"PM25F_PFS",
"P100_PFS",
"P200_I_PFS",
"P200_PFS",
"LPF_PFS",
"KP_PFS",
"NPL_PFS",
"RMP_PFS",
"TSDF_PFS",
"TF_PFS",
"UF_PFS",
"WF_PFS",
"UST_PFS",
]
def test_percentiles(tiles_df):
for col in PERCENTILE_FIELDS:
assert tiles_df[col].min() >= 0, f"Negative percentile exists for {col}"
assert (
tiles_df[col].max() <= 1
), f"Percentile over 100th exists for {col}"
assert (tiles_df[col].median() >= 0.4) & (
tiles_df[col].median() <= 0.6
), f"Percentile distribution for {col} is decidedly not uniform"
return True
def test_count_of_fips_codes(tiles_df, final_score_df):
final_score_state_count = (
final_score_df[field_names.GEOID_TRACT_FIELD].str[:2].nunique()
)
assert (
tiles_df["GTF"].str[:2].nunique() == final_score_state_count
), "Some states are missing from tiles"
pfs_columns = tiles_df.filter(like="PFS").columns.to_list()
assert (
tiles_df.dropna(how="all", subset=pfs_columns)["GTF"].str[:2].nunique()
== 56
), "Some states do not have any percentile data"
def test_column_presence(tiles_df):
expected_column_names = set(TILES_SCORE_COLUMNS.values()) | {
THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
USER_INTERFACE_EXPERIENCE_FIELD_NAME,
}
actual_column_names = set(tiles_df.columns)
extra_columns = actual_column_names - expected_column_names
missing_columns = expected_column_names - expected_column_names
assert not (
extra_columns
), f"tiles/usa.csv has columns not specified in TILE_SCORE_COLUMNS: {extra_columns}"
assert not (
missing_columns
), f"tiles/usa.csv is missing columns from TILE_SCORE_COLUMNS: {missing_columns}"
def test_tract_equality(tiles_df, final_score_df):
assert tiles_df.shape[0] == final_score_df.shape[0]
@dataclass
class ColumnValueComparison:
final_score_column: pd.Series
tiles_column: pd.Series
col_name: str
@property
def _is_tiles_column_fake_bool(self) -> bool:
if self.tiles_column.dtype == np.dtype("float64"):
fake_bool = {1.0, 0.0, None}
# Replace the nans in the column values with None for
# so we can just use issubset below
col_values = set(
not np.isnan(val) and val or None
for val in self.tiles_column.value_counts(dropna=False).index
)
return len(col_values) <= 3 and col_values.issubset(fake_bool)
return False
@property
def _is_dtype_ok(self) -> bool:
if self.final_score_column.dtype == self.tiles_column.dtype:
return True
if (
self.final_score_column.dtype == np.dtype("O")
and self.tiles_column.dtype == np.dtype("float64")
and self._is_tiles_column_fake_bool
):
return True
return False
def __post_init__(self):
self._is_value_ok = False
if self._is_dtype_ok:
if self._is_tiles_column_fake_bool:
# Cast to actual bool for useful comparison
self.tiles_column = self.tiles_column.apply(
lambda val: bool(val) if not np.isnan(val) else np.nan
)
if self.tiles_column.dtype == np.dtype("float64"):
self._is_value_ok = np.allclose(
self.final_score_column,
self.tiles_column,
atol=float(f"1e-{constants.TILES_ROUND_NUM_DECIMALS}"),
equal_nan=True,
)
else:
self._is_value_ok = self.final_score_column.equals(
self.tiles_column
)
def __bool__(self) -> bool:
return self._is_dtype_ok and bool(self._is_value_ok)
@property
def error_message(self) -> Optional[str]:
if not self._is_dtype_ok:
return (
f"Column {self.col_name} dtype mismatch: "
f"score_df: {self.final_score_column.dtype}, "
f"tile_df: {self.tiles_column.dtype}"
)
if not self._is_value_ok:
return f"Column {self.col_name} value mismatch"
return None
def test_for_column_fidelitiy_from_score(tiles_df, final_score_df):
# Verify the following:
# * Shape and tracts match between score csv and tile csv
# * If you rename score CSV columns, you are able to make the tile csv
# * The dtypes and values of every renamed score column is "equal" to
# every tile column
# * Because tiles use rounded floats, we use close with a tolerance
assert (
set(TILES_SCORE_COLUMNS.values()) - set(tiles_df.columns) == set()
), "Some TILES_SCORE_COLUMNS are missing from the tiles dataframe"
# Keep only the tiles score columns in the final score data
final_score_df = final_score_df.rename(columns=TILES_SCORE_COLUMNS).drop(
final_score_df.columns.difference(TILES_SCORE_COLUMNS.values()),
axis=1,
errors="ignore",
)
# Drop the UI-specific fields from the tiles dataframe
tiles_df = tiles_df.drop(
columns=[
"SF", # State field, added at geoscore
"CF", # County field, added at geoscore,
constants.THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
constants.USER_INTERFACE_EXPERIENCE_FIELD_NAME,
]
)
errors = []
# Are the dataframes the same shape truly
assert tiles_df.shape == final_score_df.shape
assert tiles_df["GTF"].equals(final_score_df["GTF"])
assert sorted(tiles_df.columns) == sorted(final_score_df.columns)
# Are all the dtypes and values the same?
comparisons = []
for col_name in final_score_df.columns:
value_comparison = ColumnValueComparison(
final_score_df[col_name], tiles_df[col_name], col_name
)
comparisons.append(value_comparison)
errors = [comp for comp in comparisons if not comp]
error_message = "\n".join(error.error_message for error in errors)
assert not errors, error_message
def test_for_state_names(tiles_df):
states = tiles_df["SF"].value_counts(dropna=False).index
assert np.nan not in states
assert states.all()

File diff suppressed because it is too large Load diff