Adding HOLC indicator (#1579)

Added HOLC indicator (Historic Redlining Score) from NCRC work; included 3.25 cutoff and low income as part of the housing burden category.
This commit is contained in:
Emma Nechamkin 2022-05-12 12:07:08 -04:00 committed by Emma Nechamkin
parent f680d867c7
commit 3a960018f9
10 changed files with 202 additions and 40 deletions

View file

@ -114,6 +114,11 @@ DATASET_LIST = [
"module_dir": "maryland_ejscreen",
"class_name": "MarylandEJScreenETL",
},
{
"name": "historic_redlining",
"module_dir": "historic_redlining",
"class_name": "HistoricRedliningETL",
},
# This has to come after us.json exists
{
"name": "census_acs",

View file

@ -205,7 +205,8 @@ TILES_SCORE_COLUMNS = {
field_names.M_HEALTH: "M_HLTH",
# temporarily update this so that it's the Narwhal score that gets visualized on the map
field_names.SCORE_N_COMMUNITIES: "SM_C",
field_names.SCORE_M + field_names.PERCENTILE_FIELD_SUFFIX: "SM_PFS",
field_names.SCORE_N_COMMUNITIES
+ field_names.PERCENTILE_FIELD_SUFFIX: "SM_PFS",
field_names.EXPECTED_POPULATION_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD: "EPLRLI",
field_names.EXPECTED_AGRICULTURE_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD: "EALRLI",
field_names.EXPECTED_BUILDING_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD: "EBLRLI",

View file

@ -1,5 +1,6 @@
import functools
from collections import namedtuple
from attr import field
import numpy as np
import pandas as pd
@ -36,6 +37,7 @@ class ScoreETL(ExtractTransformLoad):
self.census_decennial_df: pd.DataFrame
self.census_2010_df: pd.DataFrame
self.child_opportunity_index_df: pd.DataFrame
self.hrs_df: pd.DataFrame
def extract(self) -> None:
logger.info("Loading data sets from disk.")
@ -172,6 +174,17 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False,
)
# Load HRS data
hrs_csv = (
constants.DATA_PATH / "dataset" / "historic_redlining" / "usa.csv"
)
self.hrs_df = pd.read_csv(
hrs_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame:
logger.info("Joining Census Tract dataframes")
@ -376,6 +389,7 @@ class ScoreETL(ExtractTransformLoad):
self.census_decennial_df,
self.census_2010_df,
self.child_opportunity_index_df,
self.hrs_df,
]
# Sanity check each data frame before merging.
@ -405,7 +419,6 @@ class ScoreETL(ExtractTransformLoad):
df[field_names.MEDIAN_INCOME_FIELD] / df[field_names.AMI_FIELD]
)
# QQ: why don't we just filter to the numeric columns by type?
numeric_columns = [
field_names.HOUSING_BURDEN_FIELD,
field_names.TOTAL_POP_FIELD,
@ -465,6 +478,7 @@ class ScoreETL(ExtractTransformLoad):
non_numeric_columns = [
self.GEOID_TRACT_FIELD_NAME,
field_names.PERSISTENT_POVERTY_FIELD,
field_names.HISTORIC_REDLINING_SCORE_EXCEEDED,
]
# For some columns, high values are "good", so we want to reverse the percentile

View file

@ -46,10 +46,11 @@ class GeoScoreETL(ExtractTransformLoad):
self.DATA_PATH / "census" / "geojson" / "us.json"
)
# Import the shortened name for Score M percentile ("SM_PFS") that's used on the
# Import the shortened name for Score N percentile ("SM_PFS") that's used on the
# tiles.
## TEMPORARY update
self.TARGET_SCORE_SHORT_FIELD = constants.TILES_SCORE_COLUMNS[
field_names.SCORE_M + field_names.PERCENTILE_FIELD_SUFFIX
field_names.SCORE_N + field_names.PERCENTILE_FIELD_SUFFIX
]
self.TARGET_SCORE_RENAME_TO = "M_SCORE"
@ -284,21 +285,28 @@ class GeoScoreETL(ExtractTransformLoad):
def create_esri_codebook(codebook):
"""temporary: helper to make a codebook for esri shapefile only"""
<<<<<<< HEAD
shapefile_column_field = "shapefile_column"
internal_column_name_field = "column_name"
column_description_field = "column_description"
=======
>>>>>>> 8c255f0e (Adding HOLC indicator (#1579))
logger.info("Creating a codebook that uses the csv names")
codebook = (
pd.Series(codebook)
.reset_index()
.rename(
# kept as strings because no downstream impacts
<<<<<<< HEAD
columns={
0: internal_column_name_field,
"index": shapefile_column_field,
}
=======
columns={0: "column_name", "index": "shapefile_column"}
>>>>>>> 8c255f0e (Adding HOLC indicator (#1579))
)
)
@ -374,7 +382,7 @@ class GeoScoreETL(ExtractTransformLoad):
for task in [
write_high_to_file,
write_low_to_file,
write_esri_shapefile,
# write_esri_shapefile,
]
}

View file

@ -0,0 +1,72 @@
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings
logger = get_module_logger(__name__)
class HistoricRedliningETL(ExtractTransformLoad):
def __init__(self):
self.CSV_PATH = self.DATA_PATH / "dataset" / "historic_redlining"
self.HISTORIC_REDLINING_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL + "/HRS_2010.zip"
)
self.HISTORIC_REDLINING_FILE_PATH = (
self.get_tmp_path() / "HRS_2010.xlsx"
)
self.REDLINING_SCALAR = "Tract-level redlining score"
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.REDLINING_SCALAR,
]
self.df: pd.DataFrame
def extract(self) -> None:
logger.info("Downloading Historic Redlining Data")
super().extract(
self.HISTORIC_REDLINING_URL,
self.get_tmp_path(),
)
def transform(self) -> None:
logger.info("Transforming Historic Redlining Data")
# this is obviously temporary
historic_redlining_data = pd.read_excel(
self.HISTORIC_REDLINING_FILE_PATH
)
historic_redlining_data[self.GEOID_TRACT_FIELD_NAME] = (
historic_redlining_data["GEOID10"].astype(str).str.zfill(11)
)
historic_redlining_data = historic_redlining_data.rename(
columns={"HRS2010": self.REDLINING_SCALAR}
)
logger.info(f"{historic_redlining_data.columns}")
# Calculate lots of different score thresholds for convenience
for threshold in [3.25, 3.5, 3.75]:
historic_redlining_data[
f"{self.REDLINING_SCALAR} meets or exceeds {round(threshold, 2)}"
] = (historic_redlining_data[self.REDLINING_SCALAR] >= threshold)
## NOTE We add to columns to keep here
self.COLUMNS_TO_KEEP.append(
f"{self.REDLINING_SCALAR} meets or exceeds {round(threshold, 2)}"
)
self.df = historic_redlining_data
def load(self) -> None:
logger.info("Saving Historic Redlining CSV")
# write selected states csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df[self.COLUMNS_TO_KEEP].to_csv(
self.CSV_PATH / "usa.csv", index=False
)
def validate(self) -> None:
logger.info("Validating Historic Redlining Data")
pass

View file

@ -47,16 +47,21 @@ class MappingInequalityETL(ExtractTransformLoad):
self.HOLC_GRADE_AND_ID_FIELD: str = "holc_id"
self.CITY_INPUT_FIELD: str = "city"
self.HOLC_GRADE_D_FIELD: str = "HOLC Grade D"
self.HOLC_GRADE_D_FIELD: str = "HOLC Grade D (hazardous)"
self.HOLC_GRADE_C_FIELD: str = "HOLC Grade C (declining)"
self.HOLC_GRADE_MANUAL_FIELD: str = "HOLC Grade (manually mapped)"
self.HOLC_GRADE_DERIVED_FIELD: str = "HOLC Grade (derived)"
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
field_names.HOLC_GRADE_C_TRACT_PERCENT_FIELD,
field_names.HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD,
field_names.HOLC_GRADE_C_OR_D_TRACT_50_PERCENT_FIELD,
field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD,
field_names.HOLC_GRADE_D_TRACT_20_PERCENT_FIELD,
field_names.HOLC_GRADE_D_TRACT_50_PERCENT_FIELD,
field_names.HOLC_GRADE_D_TRACT_75_PERCENT_FIELD,
field_names.REDLINED_SHARE,
]
self.df: pd.DataFrame
@ -113,34 +118,58 @@ class MappingInequalityETL(ExtractTransformLoad):
how="left",
)
# Create a single field that combines the 'derived' grade D field with the
# manually mapped grade D field into a single grade D field.
merged_df[self.HOLC_GRADE_D_FIELD] = np.where(
(merged_df[self.HOLC_GRADE_DERIVED_FIELD] == "D")
| (merged_df[self.HOLC_GRADE_MANUAL_FIELD] == "D"),
# Create a single field that combines the 'derived' grade C and D fields with the
# manually mapped grade C and D field into a single grade C and D field.
## Note: there are no manually derived C tracts at the moment
for grade, field_name in [
("C", self.HOLC_GRADE_C_FIELD),
("D", self.HOLC_GRADE_D_FIELD),
]:
merged_df[field_name] = np.where(
(merged_df[self.HOLC_GRADE_DERIVED_FIELD] == grade)
| (merged_df[self.HOLC_GRADE_MANUAL_FIELD] == grade),
True,
None,
)
# Start grouping by, to sum all of the grade D parts of each tract.
grouped_df = (
merged_df.groupby(
by=[
self.GEOID_TRACT_FIELD_NAME,
self.HOLC_GRADE_D_FIELD,
],
# Keep the nulls, so we know the non-D proportion.
dropna=False,
)[self.TRACT_PROPORTION_FIELD]
redlined_dataframes_list = [
merged_df[merged_df[field].fillna(False)]
.groupby(self.GEOID_TRACT_FIELD_NAME)[self.TRACT_PROPORTION_FIELD]
.sum()
.rename(new_name)
for field, new_name in [
(
self.HOLC_GRADE_D_FIELD,
field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD,
),
(
self.HOLC_GRADE_C_FIELD,
field_names.HOLC_GRADE_C_TRACT_PERCENT_FIELD,
),
]
]
# Group by tract ID to get tract proportions of just C or just D
# This produces a single row per tract
grouped_df = (
pd.concat(
redlined_dataframes_list,
axis=1,
)
.fillna(0)
.reset_index()
)
# Create a field that is only the percent that is grade D.
grouped_df[field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD] = np.where(
grouped_df[self.HOLC_GRADE_D_FIELD],
grouped_df[self.TRACT_PROPORTION_FIELD],
0,
grouped_df[
field_names.HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD
] = grouped_df[
[
field_names.HOLC_GRADE_C_TRACT_PERCENT_FIELD,
field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD,
]
].sum(
axis=1
)
# Calculate some specific threshold cutoffs, for convenience.
@ -154,15 +183,14 @@ class MappingInequalityETL(ExtractTransformLoad):
grouped_df[field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD] > 0.75
)
# Drop the non-True values of `self.HOLC_GRADE_D_FIELD` -- we only
# want one row per tract for future joins.
# Note this means not all tracts will be in this data.
# Note: this singleton comparison warning may be a pylint bug:
# https://stackoverflow.com/questions/51657715/pylint-pandas-comparison-to-true-should-be-just-expr-or-expr-is-true-sin#comment90876517_51657715
# pylint: disable=singleton-comparison
grouped_df = grouped_df[
grouped_df[self.HOLC_GRADE_D_FIELD] == True # noqa: E712
]
grouped_df[field_names.HOLC_GRADE_C_OR_D_TRACT_50_PERCENT_FIELD] = (
grouped_df[field_names.HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD] > 0.5
)
# Create the indicator we will use
grouped_df[field_names.REDLINED_SHARE] = (
grouped_df[field_names.HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD] > 0.5
) & (grouped_df[field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD] > 0)
# Sort for convenience.
grouped_df.sort_values(by=self.GEOID_TRACT_FIELD_NAME, inplace=True)

View file

@ -57,7 +57,7 @@ M_WORKFORCE = "Workforce Factor (Definition M)"
M_NON_WORKFORCE = "Any Non-Workforce Factor (Definition M)"
# Definition Narwhal fields
SCORE_N = "Definition N"
SCORE_N = "Definition N (communities)"
SCORE_N_COMMUNITIES = "Definition N (communities)"
N_CLIMATE = "Climate Factor (Definition N)"
N_ENERGY = "Energy Factor (Definition N)"
@ -303,7 +303,17 @@ EJSCREEN_AREAS_OF_CONCERN_STATE_95TH_PERCENTILE_COMMUNITIES_FIELD = (
"EJSCREEN Areas of Concern, State, 95th percentile (communities)"
)
# Mapping inequality data.
REDLINED_SHARE: str = (
"Redlined share: tract had redlining and was more than 50% Grade C or D"
)
HOLC_GRADE_D_TRACT_PERCENT_FIELD: str = "Percent of tract that is HOLC Grade D"
HOLC_GRADE_C_TRACT_PERCENT_FIELD: str = "Percent of tract that is HOLC Grade C"
HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD: str = (
"Percent of tract that is HOLC Grade C or HOLC Grade D"
)
HOLC_GRADE_C_OR_D_TRACT_50_PERCENT_FIELD: str = (
"Tract is more than 50% Grade C or D"
)
HOLC_GRADE_D_TRACT_20_PERCENT_FIELD: str = "Tract is >20% HOLC Grade D"
HOLC_GRADE_D_TRACT_50_PERCENT_FIELD: str = "Tract is >50% HOLC Grade D"
HOLC_GRADE_D_TRACT_75_PERCENT_FIELD: str = "Tract is >75% HOLC Grade D"
@ -316,7 +326,7 @@ MICHIGAN_EJSCREEN_PRIORITY_COMMUNITY_FIELD: str = (
)
# CDC SVI INDEX percentile fields
CDC_SVI_INDEX_SE_THEME_FIELD: str = "SVI - Socioeconomic Index"
CDC_SVI_INDEX_SE_THEME_FIELD: str = "SVI - Social Vulnerability Index"
CDC_SVI_INDEX_HOUSEHOLD_THEME_COMPOSITION_FIELD: str = (
"SVI - Household Composition Index"
)
@ -691,5 +701,14 @@ MAPPING_FOR_EJ_PRIORITY_COMMUNITY_FIELD = (
"Mapping for Environmental Justice Priority Community"
)
# Historic Redlining Score
HISTORIC_REDLINING_SCORE_EXCEEDED = (
"Tract-level redlining score meets or exceeds 3.25"
)
HISTORIC_REDLINING_SCORE_EXCEEDED_LOW_INCOME_FIELD = (
"Tract-level redlining score meets or exceeds 3.25 and is low income"
)
# End of names for individual factors being exceeded
####

View file

@ -1,4 +1,5 @@
from typing import Tuple
from attr import field
import numpy as np
import pandas as pd
@ -308,11 +309,22 @@ class ScoreNarwhal(Score):
# poverty level and has a low percent of higher ed students.
# Source: Census's American Community Survey
## Additionally, we look to see if HISTORIC_REDLINING_SCORE_EXCEEDED is True and the tract is also low income
housing_eligibility_columns = [
field_names.LEAD_PAINT_MEDIAN_HOUSE_VALUE_LOW_INCOME_FIELD,
field_names.HOUSING_BURDEN_LOW_INCOME_FIELD,
field_names.HISTORIC_REDLINING_SCORE_EXCEEDED_LOW_INCOME_FIELD,
]
# design question -- should read in scalar with threshold here instead?
self.df[
field_names.HISTORIC_REDLINING_SCORE_EXCEEDED_LOW_INCOME_FIELD
] = (
self.df[field_names.HISTORIC_REDLINING_SCORE_EXCEEDED]
& self.df[field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED]
)
self.df[field_names.LEAD_PAINT_PROXY_PCTILE_THRESHOLD] = (
self.df[
field_names.LEAD_PAINT_FIELD
@ -804,5 +816,8 @@ class ScoreNarwhal(Score):
]
self.df[field_names.CATEGORY_COUNT] = self.df[factors].sum(axis=1)
self.df[field_names.SCORE_N_COMMUNITIES] = self.df[factors].any(axis=1)
self.df[
field_names.SCORE_N_COMMUNITIES + field_names.PERCENTILE_FIELD_SUFFIX
] = self.df[field_names.SCORE_N_COMMUNITIES].astype(int)
return self.df