Merge branch 'emma-nechamkin/release/score-narwhal' of github.com:usds/justice40-tool into emma-nechamkin/release/score-narwhal

This commit is contained in:
Emma Nechamkin 2022-09-07 13:48:22 -04:00
commit 31eac4101e
12 changed files with 865 additions and 507 deletions

View file

@ -95,12 +95,6 @@ DATASET_LIST = [
"class_name": "GeoCorrETL", "class_name": "GeoCorrETL",
"is_memory_intensive": False, "is_memory_intensive": False,
}, },
{
"name": "child_opportunity_index",
"module_dir": "child_opportunity_index",
"class_name": "ChildOpportunityIndex",
"is_memory_intensive": False,
},
{ {
"name": "mapping_inequality", "name": "mapping_inequality",
"module_dir": "mapping_inequality", "module_dir": "mapping_inequality",

View file

@ -397,7 +397,7 @@ TILES_SCORE_FLOAT_COLUMNS = [
# Geojson cannot support nulls in a boolean column when we create tiles; # Geojson cannot support nulls in a boolean column when we create tiles;
# to preserve null character, we coerce to floats for all fields # to preserve null character, we coerce to floats for all fields
# that use null to signify missing information in a boolean field. # that use null to signify missing information in a boolean field.
field_names.ELIGIBLE_FUDS_BINARY_FIELD_NAME, field_names.ELIGIBLE_FUDS_BINARY_FIELD_NAME,
field_names.AML_BOOLEAN, field_names.AML_BOOLEAN,
field_names.HISTORIC_REDLINING_SCORE_EXCEEDED field_names.HISTORIC_REDLINING_SCORE_EXCEEDED
] ]

View file

@ -42,10 +42,9 @@ class ScoreETL(ExtractTransformLoad):
self.doe_energy_burden_df: pd.DataFrame self.doe_energy_burden_df: pd.DataFrame
self.national_risk_index_df: pd.DataFrame self.national_risk_index_df: pd.DataFrame
self.geocorr_urban_rural_df: pd.DataFrame self.geocorr_urban_rural_df: pd.DataFrame
self.persistent_poverty_df: pd.DataFrame
self.census_decennial_df: pd.DataFrame self.census_decennial_df: pd.DataFrame
self.census_2010_df: pd.DataFrame self.census_2010_df: pd.DataFrame
self.child_opportunity_index_df: pd.DataFrame self.national_tract_df: pd.DataFrame
self.hrs_df: pd.DataFrame self.hrs_df: pd.DataFrame
self.dot_travel_disadvantage_df: pd.DataFrame self.dot_travel_disadvantage_df: pd.DataFrame
self.fsf_flood_df: pd.DataFrame self.fsf_flood_df: pd.DataFrame
@ -159,16 +158,6 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False, low_memory=False,
) )
# Load persistent poverty
persistent_poverty_csv = (
constants.DATA_PATH / "dataset" / "persistent_poverty" / "usa.csv"
)
self.persistent_poverty_df = pd.read_csv(
persistent_poverty_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
# Load decennial census data # Load decennial census data
census_decennial_csv = ( census_decennial_csv = (
constants.DATA_PATH constants.DATA_PATH
@ -192,19 +181,6 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False, low_memory=False,
) )
# Load COI data
child_opportunity_index_csv = (
constants.DATA_PATH
/ "dataset"
/ "child_opportunity_index"
/ "usa.csv"
)
self.child_opportunity_index_df = pd.read_csv(
child_opportunity_index_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
# Load HRS data # Load HRS data
hrs_csv = ( hrs_csv = (
constants.DATA_PATH / "dataset" / "historic_redlining" / "usa.csv" constants.DATA_PATH / "dataset" / "historic_redlining" / "usa.csv"
@ -216,6 +192,15 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False, low_memory=False,
) )
national_tract_csv = constants.DATA_CENSUS_CSV_FILE_PATH
self.national_tract_df = pd.read_csv(
national_tract_csv,
names=[self.GEOID_TRACT_FIELD_NAME],
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
header=None,
)
def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame: def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame:
logger.info("Joining Census Tract dataframes") logger.info("Joining Census Tract dataframes")
@ -363,12 +348,10 @@ class ScoreETL(ExtractTransformLoad):
self.doe_energy_burden_df, self.doe_energy_burden_df,
self.ejscreen_df, self.ejscreen_df,
self.geocorr_urban_rural_df, self.geocorr_urban_rural_df,
self.persistent_poverty_df,
self.national_risk_index_df, self.national_risk_index_df,
self.census_acs_median_incomes_df, self.census_acs_median_incomes_df,
self.census_decennial_df, self.census_decennial_df,
self.census_2010_df, self.census_2010_df,
self.child_opportunity_index_df,
self.hrs_df, self.hrs_df,
self.dot_travel_disadvantage_df, self.dot_travel_disadvantage_df,
self.fsf_flood_df, self.fsf_flood_df,
@ -384,8 +367,21 @@ class ScoreETL(ExtractTransformLoad):
census_tract_df = self._join_tract_dfs(census_tract_dfs) census_tract_df = self._join_tract_dfs(census_tract_dfs)
# If GEOID10s are read as numbers instead of strings, the initial 0 is dropped, # Drop tracts that don't exist in the 2010 tracts
# and then we get too many CBG rows (one for 012345 and one for 12345). pre_join_len = census_tract_df[field_names.GEOID_TRACT_FIELD].nunique()
census_tract_df = census_tract_df.merge(
self.national_tract_df,
on="GEOID10_TRACT",
how="inner",
)
assert (
census_tract_df.shape[0] <= pre_join_len
), "Join against national tract list ADDED rows"
logger.info(
"Dropped %s tracts not in the 2010 tract data",
pre_join_len - census_tract_df[field_names.GEOID_TRACT_FIELD].nunique()
)
# Now sanity-check the merged df. # Now sanity-check the merged df.
self._census_tract_df_sanity_check( self._census_tract_df_sanity_check(
@ -455,9 +451,6 @@ class ScoreETL(ExtractTransformLoad):
field_names.CENSUS_UNEMPLOYMENT_FIELD_2010, field_names.CENSUS_UNEMPLOYMENT_FIELD_2010,
field_names.CENSUS_POVERTY_LESS_THAN_100_FPL_FIELD_2010, field_names.CENSUS_POVERTY_LESS_THAN_100_FPL_FIELD_2010,
field_names.CENSUS_DECENNIAL_TOTAL_POPULATION_FIELD_2009, field_names.CENSUS_DECENNIAL_TOTAL_POPULATION_FIELD_2009,
field_names.EXTREME_HEAT_FIELD,
field_names.HEALTHY_FOOD_FIELD,
field_names.IMPENETRABLE_SURFACES_FIELD,
field_names.UST_FIELD, field_names.UST_FIELD,
field_names.DOT_TRAVEL_BURDEN_FIELD, field_names.DOT_TRAVEL_BURDEN_FIELD,
field_names.FUTURE_FLOOD_RISK_FIELD, field_names.FUTURE_FLOOD_RISK_FIELD,
@ -479,7 +472,6 @@ class ScoreETL(ExtractTransformLoad):
non_numeric_columns = [ non_numeric_columns = [
self.GEOID_TRACT_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME,
field_names.PERSISTENT_POVERTY_FIELD,
field_names.TRACT_ELIGIBLE_FOR_NONNATURAL_THRESHOLD, field_names.TRACT_ELIGIBLE_FOR_NONNATURAL_THRESHOLD,
field_names.AGRICULTURAL_VALUE_BOOL_FIELD, field_names.AGRICULTURAL_VALUE_BOOL_FIELD,
] ]
@ -509,10 +501,6 @@ class ScoreETL(ExtractTransformLoad):
# This low field will not exist yet, it is only calculated for the # This low field will not exist yet, it is only calculated for the
# percentile. # percentile.
# TODO: This will come from the YAML dataset config # TODO: This will come from the YAML dataset config
ReversePercentile(
field_name=field_names.READING_FIELD,
low_field_name=field_names.LOW_READING_FIELD,
),
ReversePercentile( ReversePercentile(
field_name=field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD, field_name=field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD,
low_field_name=field_names.LOW_MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD, low_field_name=field_names.LOW_MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD,

View file

@ -45,7 +45,6 @@ class PostScoreETL(ExtractTransformLoad):
self.input_counties_df: pd.DataFrame self.input_counties_df: pd.DataFrame
self.input_states_df: pd.DataFrame self.input_states_df: pd.DataFrame
self.input_score_df: pd.DataFrame self.input_score_df: pd.DataFrame
self.input_national_tract_df: pd.DataFrame
self.output_score_county_state_merged_df: pd.DataFrame self.output_score_county_state_merged_df: pd.DataFrame
self.output_score_tiles_df: pd.DataFrame self.output_score_tiles_df: pd.DataFrame
@ -92,7 +91,9 @@ class PostScoreETL(ExtractTransformLoad):
def _extract_score(self, score_path: Path) -> pd.DataFrame: def _extract_score(self, score_path: Path) -> pd.DataFrame:
logger.info("Reading Score CSV") logger.info("Reading Score CSV")
df = pd.read_csv( df = pd.read_csv(
score_path, dtype={self.GEOID_TRACT_FIELD_NAME: "string"} score_path,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
) )
# Convert total population to an int # Convert total population to an int
@ -102,18 +103,6 @@ class PostScoreETL(ExtractTransformLoad):
return df return df
def _extract_national_tract(
self, national_tract_path: Path
) -> pd.DataFrame:
logger.info("Reading national tract file")
return pd.read_csv(
national_tract_path,
names=[self.GEOID_TRACT_FIELD_NAME],
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
header=None,
)
def extract(self) -> None: def extract(self) -> None:
logger.info("Starting Extraction") logger.info("Starting Extraction")
@ -136,9 +125,6 @@ class PostScoreETL(ExtractTransformLoad):
self.input_score_df = self._extract_score( self.input_score_df = self._extract_score(
constants.DATA_SCORE_CSV_FULL_FILE_PATH constants.DATA_SCORE_CSV_FULL_FILE_PATH
) )
self.input_national_tract_df = self._extract_national_tract(
constants.DATA_CENSUS_CSV_FILE_PATH
)
def _transform_counties( def _transform_counties(
self, initial_counties_df: pd.DataFrame self, initial_counties_df: pd.DataFrame
@ -185,7 +171,6 @@ class PostScoreETL(ExtractTransformLoad):
def _create_score_data( def _create_score_data(
self, self,
national_tract_df: pd.DataFrame,
counties_df: pd.DataFrame, counties_df: pd.DataFrame,
states_df: pd.DataFrame, states_df: pd.DataFrame,
score_df: pd.DataFrame, score_df: pd.DataFrame,
@ -217,28 +202,11 @@ class PostScoreETL(ExtractTransformLoad):
right_on=self.STATE_CODE_COLUMN, right_on=self.STATE_CODE_COLUMN,
how="left", how="left",
) )
assert score_county_merged[
# check if there are census tracts without score self.GEOID_TRACT_FIELD_NAME
logger.info("Removing tract rows without score") ].is_unique, "Merging state/county data introduced duplicate rows"
# merge census tracts with score
merged_df = national_tract_df.merge(
score_county_state_merged,
on=self.GEOID_TRACT_FIELD_NAME,
how="left",
)
# recast population to integer
score_county_state_merged["Total population"] = (
merged_df["Total population"].fillna(0).astype(int)
)
de_duplicated_df = merged_df.dropna(
subset=[DISADVANTAGED_COMMUNITIES_FIELD]
)
# set the score to the new df # set the score to the new df
return de_duplicated_df return score_county_state_merged
def _create_tile_data( def _create_tile_data(
self, self,
@ -427,7 +395,6 @@ class PostScoreETL(ExtractTransformLoad):
transformed_score = self._transform_score(self.input_score_df) transformed_score = self._transform_score(self.input_score_df)
output_score_county_state_merged_df = self._create_score_data( output_score_county_state_merged_df = self._create_score_data(
self.input_national_tract_df,
transformed_counties, transformed_counties,
transformed_states, transformed_states,
transformed_score, transformed_score,

View file

@ -67,14 +67,12 @@ def test_transform_score(etl, score_data_initial, score_transformed_expected):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
def test_create_score_data( def test_create_score_data(
etl, etl,
national_tract_df,
counties_transformed_expected, counties_transformed_expected,
states_transformed_expected, states_transformed_expected,
score_transformed_expected, score_transformed_expected,
score_data_expected, score_data_expected,
): ):
score_data_actual = etl._create_score_data( score_data_actual = etl._create_score_data(
national_tract_df,
counties_transformed_expected, counties_transformed_expected,
states_transformed_expected, states_transformed_expected,
score_transformed_expected, score_transformed_expected,

View file

@ -59,7 +59,7 @@ class TribalETL(ExtractTransformLoad):
) )
bia_national_lar_df.rename( bia_national_lar_df.rename(
columns={"TSAID": "tribalId", "LARName": "landAreaName"}, columns={"LARID": "tribalId", "LARName": "landAreaName"},
inplace=True, inplace=True,
) )
@ -154,7 +154,9 @@ class TribalETL(ExtractTransformLoad):
# load the geojsons # load the geojsons
bia_national_lar_geojson = ( bia_national_lar_geojson = (
self.GEOJSON_BASE_PATH / "bia_national_lar" / "BIA_TSA.json" self.GEOJSON_BASE_PATH
/ "bia_national_lar"
/ "BIA_National_LAR.json"
) )
bia_aian_supplemental_geojson = ( bia_aian_supplemental_geojson = (
self.GEOJSON_BASE_PATH self.GEOJSON_BASE_PATH

View file

@ -318,21 +318,6 @@ MARYLAND_EJSCREEN_SCORE_FIELD: str = "Maryland Environmental Justice Score"
MARYLAND_EJSCREEN_BURDENED_THRESHOLD_FIELD: str = ( MARYLAND_EJSCREEN_BURDENED_THRESHOLD_FIELD: str = (
"Maryland EJSCREEN Priority Community" "Maryland EJSCREEN Priority Community"
) )
# Child Opportunity Index data
# Summer days with maximum temperature above 90F.
EXTREME_HEAT_FIELD = "Summer days above 90F"
# Percentage households without a car located further than a half-mile from the
# nearest supermarket.
HEALTHY_FOOD_FIELD = "Percent low access to healthy food"
# Percentage impenetrable surface areas such as rooftops, roads or parking lots.
IMPENETRABLE_SURFACES_FIELD = "Percent impenetrable surface areas"
# Percentage third graders scoring proficient on standardized reading tests,
# converted to NAEP scale score points.
READING_FIELD = "Third grade reading proficiency"
LOW_READING_FIELD = "Low third grade reading proficiency"
# Alternative energy-related definition of DACs # Alternative energy-related definition of DACs
ENERGY_RELATED_COMMUNITIES_DEFINITION_ALTERNATIVE = ( ENERGY_RELATED_COMMUNITIES_DEFINITION_ALTERNATIVE = (

View file

@ -1,12 +1,217 @@
import pandas as pd import pandas as pd
import pytest import pytest
from data_pipeline.config import settings from data_pipeline.config import settings
from data_pipeline.score import field_names from data_pipeline.score.field_names import GEOID_TRACT_FIELD
from data_pipeline.etl.score import constants
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def final_score_df(): def final_score_df():
return pd.read_csv( return pd.read_csv(
settings.APP_ROOT / "data" / "score" / "csv" / "full" / "usa.csv", settings.APP_ROOT / "data" / "score" / "csv" / "full" / "usa.csv",
dtype={field_names.GEOID_TRACT_FIELD: str}, dtype={GEOID_TRACT_FIELD: str},
low_memory=False,
)
@pytest.fixture()
def census_df():
census_csv = constants.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
return pd.read_csv(
census_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def ejscreen_df():
ejscreen_csv = constants.DATA_PATH / "dataset" / "ejscreen" / "usa.csv"
return pd.read_csv(
ejscreen_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def hud_housing_df():
hud_housing_csv = (
constants.DATA_PATH / "dataset" / "hud_housing" / "usa.csv"
)
return pd.read_csv(
hud_housing_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def cdc_places_df():
cdc_places_csv = constants.DATA_PATH / "dataset" / "cdc_places" / "usa.csv"
return pd.read_csv(
cdc_places_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def census_acs_median_incomes_df():
census_acs_median_incomes_csv = (
constants.DATA_PATH
/ "dataset"
/ "census_acs_median_income_2019"
/ "usa.csv"
)
return pd.read_csv(
census_acs_median_incomes_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def cdc_life_expectancy_df():
cdc_life_expectancy_csv = (
constants.DATA_PATH / "dataset" / "cdc_life_expectancy" / "usa.csv"
)
return pd.read_csv(
cdc_life_expectancy_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def doe_energy_burden_df():
doe_energy_burden_csv = (
constants.DATA_PATH / "dataset" / "doe_energy_burden" / "usa.csv"
)
return pd.read_csv(
doe_energy_burden_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def national_risk_index_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "national_risk_index" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def dot_travel_disadvantage_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "travel_composite" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def fsf_fire_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "fsf_wildfire_risk" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def fsf_flood_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "fsf_flood_risk" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def nature_deprived_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "nlcd_nature_deprived" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def eamlis_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "eamlis" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def fuds_df():
return pd.read_csv(
constants.DATA_PATH / "dataset" / "us_army_fuds" / "usa.csv",
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def geocorr_urban_rural_df():
geocorr_urban_rural_csv = (
constants.DATA_PATH / "dataset" / "geocorr" / "usa.csv"
)
return pd.read_csv(
geocorr_urban_rural_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def census_decennial_df():
census_decennial_csv = (
constants.DATA_PATH / "dataset" / "census_decennial_2010" / "usa.csv"
)
return pd.read_csv(
census_decennial_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def census_2010_df():
census_2010_csv = (
constants.DATA_PATH / "dataset" / "census_acs_2010" / "usa.csv"
)
return pd.read_csv(
census_2010_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def hrs_df():
hrs_csv = constants.DATA_PATH / "dataset" / "historic_redlining" / "usa.csv"
return pd.read_csv(
hrs_csv,
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
)
@pytest.fixture()
def national_tract_df():
national_tract_csv = constants.DATA_CENSUS_CSV_FILE_PATH
return pd.read_csv(
national_tract_csv,
names=[GEOID_TRACT_FIELD],
dtype={GEOID_TRACT_FIELD: "string"},
low_memory=False,
header=None,
) )

View file

@ -28,7 +28,6 @@ class PercentileTestConfig:
return self.percentile_column_name return self.percentile_column_name
### TODO: we need to blow this out for all eight categories
def _check_percentile_against_threshold(df, config: PercentileTestConfig): def _check_percentile_against_threshold(df, config: PercentileTestConfig):
"""Note - for the purpose of testing, this fills with False""" """Note - for the purpose of testing, this fills with False"""
is_minimum_flagged_ok = ( is_minimum_flagged_ok = (

View file

@ -1,12 +1,37 @@
# flake8: noqa: W0613,W0611,F811 # flake8: noqa: W0613,W0611,F811,
# pylint: disable=unused-import,too-many-arguments
from dataclasses import dataclass from dataclasses import dataclass
from typing import List from typing import List
import pytest import pytest
import pandas as pd import pandas as pd
import numpy as np
from data_pipeline.score import field_names from data_pipeline.score import field_names
from .fixtures import final_score_df # pylint: disable=unused-import from data_pipeline.score.field_names import GEOID_TRACT_FIELD
from .fixtures import (
final_score_df,
ejscreen_df,
hud_housing_df,
census_df,
cdc_places_df,
census_acs_median_incomes_df,
cdc_life_expectancy_df,
doe_energy_burden_df,
national_risk_index_df,
dot_travel_disadvantage_df,
fsf_fire_df,
nature_deprived_df,
eamlis_df,
fuds_df,
geocorr_urban_rural_df,
census_decennial_df,
census_2010_df,
hrs_df,
national_tract_df,
)
pytestmark = pytest.mark.smoketest pytestmark = pytest.mark.smoketest
UNMATCHED_TRACK_THRESHOLD = 1000
def _helper_test_count_exceeding_threshold(df, col, error_check=1000): def _helper_test_count_exceeding_threshold(df, col, error_check=1000):
@ -203,3 +228,98 @@ def test_donut_hole_addition_to_score_n(final_score_df):
assert ( assert (
new_donuts > 0 new_donuts > 0
), "FYI: The adjacency index is doing nothing. Consider removing it?" ), "FYI: The adjacency index is doing nothing. Consider removing it?"
def test_data_sources(
final_score_df,
hud_housing_df,
ejscreen_df,
census_df,
cdc_places_df,
census_acs_median_incomes_df,
cdc_life_expectancy_df,
doe_energy_burden_df,
national_risk_index_df,
dot_travel_disadvantage_df,
fsf_fire_df,
nature_deprived_df,
eamlis_df,
fuds_df,
geocorr_urban_rural_df,
census_decennial_df,
census_2010_df,
hrs_df,
):
data_sources = {
key: value for key, value in locals().items() if key != "final_score_df"
}
for data_source_name, data_source in data_sources.items():
final = "final_"
df: pd.DataFrame = final_score_df.merge(
data_source,
on=GEOID_TRACT_FIELD,
indicator="MERGE",
suffixes=(final, f"_{data_source_name}"),
how="outer",
)
# Make our lists of columns for later comparison
core_cols = data_source.columns.intersection(
final_score_df.columns
).drop(GEOID_TRACT_FIELD)
data_source_columns = [f"{col}_{data_source_name}" for col in core_cols]
final_columns = [f"{col}{final}" for col in core_cols]
assert (
final_columns
), f"No columns from data source show up in final score in source {data_source_name}"
# Make sure we have NAs for any tracts in the final data that aren't
# covered in the final data
assert np.all(df[df.MERGE == "left_only"][final_columns].isna())
# Make sure the datasource doesn't have a ton of unmatched tracts, implying it
# has moved to 2020 tracts
assert len(df[df.MERGE == "right_only"]) < UNMATCHED_TRACK_THRESHOLD
df = df[df.MERGE == "both"]
# Compare every column for equality, using close equality for numerics and
# `equals` equality for non-numeric columns
for final_column, data_source_column in zip(
data_source_columns, final_columns
):
error_message = (
f"Column {final_column} not equal "
f"between {data_source_name} and final score"
)
if df[final_column].dtype in [
np.dtype(object),
np.dtype(bool),
np.dtype(str),
]:
assert df[final_column].equals(
df[data_source_column]
), error_message
else:
assert np.allclose(
df[final_column],
df[data_source_column],
equal_nan=True,
), error_message
def test_output_tracts(final_score_df, national_tract_df):
df = final_score_df.merge(
national_tract_df,
on=GEOID_TRACT_FIELD,
how="outer",
indicator="MERGE",
)
counts = df.value_counts("MERGE")
assert counts.loc["left_only"] == 0
assert counts.loc["right_only"] == 0
def test_all_tracts_have_scores(final_score_df):
assert not final_score_df[field_names.SCORE_N_COMMUNITIES].isna().any()

View file

@ -0,0 +1,221 @@
# flake8: noqa: W0613,W0611,F811
from dataclasses import dataclass
from typing import Optional
import pandas as pd
import numpy as np
import pytest
from data_pipeline.config import settings
from data_pipeline.etl.score import constants
from data_pipeline.score import field_names
from data_pipeline.etl.score.constants import (
TILES_SCORE_COLUMNS,
THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
USER_INTERFACE_EXPERIENCE_FIELD_NAME,
)
from .fixtures import final_score_df # pylint: disable=unused-import
pytestmark = pytest.mark.smoketest
@pytest.fixture
def tiles_df(scope="session"):
return pd.read_csv(
settings.APP_ROOT / "data" / "score" / "csv" / "tiles" / "usa.csv",
dtype={"GTF": str},
low_memory=False,
)
PERCENTILE_FIELDS = [
"DF_PFS",
"AF_PFS",
"HDF_PFS",
"DSF_PFS",
"EBF_PFS",
"EALR_PFS",
"EBLR_PFS",
"EPLR_PFS",
"HBF_PFS",
"LLEF_PFS",
"LIF_PFS",
"LMI_PFS",
"MHVF_PFS",
"PM25F_PFS",
"P100_PFS",
"P200_I_PFS",
"P200_PFS",
"LPF_PFS",
"KP_PFS",
"NPL_PFS",
"RMP_PFS",
"TSDF_PFS",
"TF_PFS",
"UF_PFS",
"WF_PFS",
"UST_PFS",
]
def test_percentiles(tiles_df):
for col in PERCENTILE_FIELDS:
assert tiles_df[col].min() >= 0, f"Negative percentile exists for {col}"
assert (
tiles_df[col].max() <= 1
), f"Percentile over 100th exists for {col}"
assert (tiles_df[col].median() >= 0.4) & (
tiles_df[col].median() <= 0.6
), f"Percentile distribution for {col} is decidedly not uniform"
return True
def test_count_of_fips_codes(tiles_df, final_score_df):
final_score_state_count = (
final_score_df[field_names.GEOID_TRACT_FIELD].str[:2].nunique()
)
assert (
tiles_df["GTF"].str[:2].nunique() == final_score_state_count
), "Some states are missing from tiles"
pfs_columns = tiles_df.filter(like="PFS").columns.to_list()
assert (
tiles_df.dropna(how="all", subset=pfs_columns)["GTF"].str[:2].nunique()
== 56
), "Some states do not have any percentile data"
def test_column_presence(tiles_df):
expected_column_names = set(TILES_SCORE_COLUMNS.values()) | {
THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
USER_INTERFACE_EXPERIENCE_FIELD_NAME,
}
actual_column_names = set(tiles_df.columns)
extra_columns = actual_column_names - expected_column_names
missing_columns = expected_column_names - expected_column_names
assert not (
extra_columns
), f"tiles/usa.csv has columns not specified in TILE_SCORE_COLUMNS: {extra_columns}"
assert not (
missing_columns
), f"tiles/usa.csv is missing columns from TILE_SCORE_COLUMNS: {missing_columns}"
def test_tract_equality(tiles_df, final_score_df):
assert tiles_df.shape[0] == final_score_df.shape[0]
@dataclass
class ColumnValueComparison:
final_score_column: pd.Series
tiles_column: pd.Series
col_name: str
@property
def _is_tiles_column_fake_bool(self) -> bool:
if self.tiles_column.dtype == np.dtype("float64"):
fake_bool = {1.0, 0.0, None}
# Replace the nans in the column values with None for
# so we can just use issubset below
col_values = set(
not np.isnan(val) and val or None
for val in self.tiles_column.value_counts(dropna=False).index
)
return len(col_values) <= 3 and col_values.issubset(fake_bool)
return False
@property
def _is_dtype_ok(self) -> bool:
if self.final_score_column.dtype == self.tiles_column.dtype:
return True
if (
self.final_score_column.dtype == np.dtype("O")
and self.tiles_column.dtype == np.dtype("float64")
and self._is_tiles_column_fake_bool
):
return True
return False
def __post_init__(self):
self._is_value_ok = False
if self._is_dtype_ok:
if self._is_tiles_column_fake_bool:
# Cast to actual bool for useful comparison
self.tiles_column = self.tiles_column.apply(
lambda val: bool(val) if not np.isnan(val) else np.nan
)
if self.tiles_column.dtype == np.dtype("float64"):
self._is_value_ok = np.allclose(
self.final_score_column,
self.tiles_column,
atol=float(f"1e-{constants.TILES_ROUND_NUM_DECIMALS}"),
equal_nan=True,
)
else:
self._is_value_ok = self.final_score_column.equals(
self.tiles_column
)
def __bool__(self) -> bool:
return self._is_dtype_ok and bool(self._is_value_ok)
@property
def error_message(self) -> Optional[str]:
if not self._is_dtype_ok:
return (
f"Column {self.col_name} dtype mismatch: "
f"score_df: {self.final_score_column.dtype}, "
f"tile_df: {self.tiles_column.dtype}"
)
if not self._is_value_ok:
return f"Column {self.col_name} value mismatch"
return None
def test_for_column_fidelitiy_from_score(tiles_df, final_score_df):
# Verify the following:
# * Shape and tracts match between score csv and tile csv
# * If you rename score CSV columns, you are able to make the tile csv
# * The dtypes and values of every renamed score column is "equal" to
# every tile column
# * Because tiles use rounded floats, we use close with a tolerance
assert (
set(TILES_SCORE_COLUMNS.values()) - set(tiles_df.columns) == set()
), "Some TILES_SCORE_COLUMNS are missing from the tiles dataframe"
# Keep only the tiles score columns in the final score data
final_score_df = final_score_df.rename(columns=TILES_SCORE_COLUMNS).drop(
final_score_df.columns.difference(TILES_SCORE_COLUMNS.values()),
axis=1,
errors="ignore",
)
# Drop the UI-specific fields from the tiles dataframe
tiles_df = tiles_df.drop(
columns=[
"SF", # State field, added at geoscore
"CF", # County field, added at geoscore,
constants.THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
constants.USER_INTERFACE_EXPERIENCE_FIELD_NAME,
]
)
errors = []
# Are the dataframes the same shape truly
assert tiles_df.shape == final_score_df.shape
assert tiles_df["GTF"].equals(final_score_df["GTF"])
assert sorted(tiles_df.columns) == sorted(final_score_df.columns)
# Are all the dtypes and values the same?
comparisons = []
for col_name in final_score_df.columns:
value_comparison = ColumnValueComparison(
final_score_df[col_name], tiles_df[col_name], col_name
)
comparisons.append(value_comparison)
errors = [comp for comp in comparisons if not comp]
error_message = "\n".join(error.error_message for error in errors)
assert not errors, error_message
def test_for_state_names(tiles_df):
states = tiles_df["SF"].value_counts(dropna=False).index
assert np.nan not in states
assert states.all()

File diff suppressed because it is too large Load diff