diff --git a/data/data-pipeline/data_pipeline/etl/constants.py b/data/data-pipeline/data_pipeline/etl/constants.py index bc04e672..80faae9d 100644 --- a/data/data-pipeline/data_pipeline/etl/constants.py +++ b/data/data-pipeline/data_pipeline/etl/constants.py @@ -114,6 +114,11 @@ DATASET_LIST = [ "module_dir": "maryland_ejscreen", "class_name": "MarylandEJScreenETL", }, + { + "name": "historic_redlining", + "module_dir": "historic_redlining", + "class_name": "HistoricRedliningETL", + }, # This has to come after us.json exists { "name": "census_acs", diff --git a/data/data-pipeline/data_pipeline/etl/score/constants.py b/data/data-pipeline/data_pipeline/etl/score/constants.py index 184dbeb9..5bb4374b 100644 --- a/data/data-pipeline/data_pipeline/etl/score/constants.py +++ b/data/data-pipeline/data_pipeline/etl/score/constants.py @@ -205,7 +205,8 @@ TILES_SCORE_COLUMNS = { field_names.M_HEALTH: "M_HLTH", # temporarily update this so that it's the Narwhal score that gets visualized on the map field_names.SCORE_N_COMMUNITIES: "SM_C", - field_names.SCORE_M + field_names.PERCENTILE_FIELD_SUFFIX: "SM_PFS", + field_names.SCORE_N_COMMUNITIES + + field_names.PERCENTILE_FIELD_SUFFIX: "SM_PFS", field_names.EXPECTED_POPULATION_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD: "EPLRLI", field_names.EXPECTED_AGRICULTURE_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD: "EALRLI", field_names.EXPECTED_BUILDING_LOSS_RATE_LOW_INCOME_LOW_HIGHER_ED_FIELD: "EBLRLI", diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index cdb78522..e5674518 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -1,5 +1,6 @@ import functools from collections import namedtuple +from attr import field import numpy as np import pandas as pd @@ -36,6 +37,7 @@ class ScoreETL(ExtractTransformLoad): self.census_decennial_df: pd.DataFrame self.census_2010_df: pd.DataFrame self.child_opportunity_index_df: pd.DataFrame + self.hrs_df: pd.DataFrame def extract(self) -> None: logger.info("Loading data sets from disk.") @@ -172,6 +174,17 @@ class ScoreETL(ExtractTransformLoad): low_memory=False, ) + # Load HRS data + hrs_csv = ( + constants.DATA_PATH / "dataset" / "historic_redlining" / "usa.csv" + ) + + self.hrs_df = pd.read_csv( + hrs_csv, + dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, + low_memory=False, + ) + def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame: logger.info("Joining Census Tract dataframes") @@ -376,6 +389,7 @@ class ScoreETL(ExtractTransformLoad): self.census_decennial_df, self.census_2010_df, self.child_opportunity_index_df, + self.hrs_df, ] # Sanity check each data frame before merging. @@ -405,7 +419,6 @@ class ScoreETL(ExtractTransformLoad): df[field_names.MEDIAN_INCOME_FIELD] / df[field_names.AMI_FIELD] ) - # QQ: why don't we just filter to the numeric columns by type? numeric_columns = [ field_names.HOUSING_BURDEN_FIELD, field_names.TOTAL_POP_FIELD, @@ -465,6 +478,7 @@ class ScoreETL(ExtractTransformLoad): non_numeric_columns = [ self.GEOID_TRACT_FIELD_NAME, field_names.PERSISTENT_POVERTY_FIELD, + field_names.HISTORIC_REDLINING_SCORE_EXCEEDED, ] # For some columns, high values are "good", so we want to reverse the percentile diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py b/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py index a4d3bef2..14f5f88b 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py @@ -46,10 +46,11 @@ class GeoScoreETL(ExtractTransformLoad): self.DATA_PATH / "census" / "geojson" / "us.json" ) - # Import the shortened name for Score M percentile ("SM_PFS") that's used on the + # Import the shortened name for Score N percentile ("SM_PFS") that's used on the # tiles. + ## TEMPORARY update self.TARGET_SCORE_SHORT_FIELD = constants.TILES_SCORE_COLUMNS[ - field_names.SCORE_M + field_names.PERCENTILE_FIELD_SUFFIX + field_names.SCORE_N + field_names.PERCENTILE_FIELD_SUFFIX ] self.TARGET_SCORE_RENAME_TO = "M_SCORE" @@ -284,21 +285,28 @@ class GeoScoreETL(ExtractTransformLoad): def create_esri_codebook(codebook): """temporary: helper to make a codebook for esri shapefile only""" +<<<<<<< HEAD shapefile_column_field = "shapefile_column" internal_column_name_field = "column_name" column_description_field = "column_description" +======= +>>>>>>> 8c255f0e (Adding HOLC indicator (#1579)) logger.info("Creating a codebook that uses the csv names") codebook = ( pd.Series(codebook) .reset_index() .rename( # kept as strings because no downstream impacts +<<<<<<< HEAD columns={ 0: internal_column_name_field, "index": shapefile_column_field, } +======= + columns={0: "column_name", "index": "shapefile_column"} +>>>>>>> 8c255f0e (Adding HOLC indicator (#1579)) ) ) @@ -374,7 +382,7 @@ class GeoScoreETL(ExtractTransformLoad): for task in [ write_high_to_file, write_low_to_file, - write_esri_shapefile, + # write_esri_shapefile, ] } diff --git a/data/data-pipeline/data_pipeline/etl/sources/historic_redlining/README.md b/data/data-pipeline/data_pipeline/etl/sources/historic_redlining/README.md new file mode 100644 index 00000000..e69de29b diff --git a/data/data-pipeline/data_pipeline/etl/sources/historic_redlining/__init__.py b/data/data-pipeline/data_pipeline/etl/sources/historic_redlining/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/data/data-pipeline/data_pipeline/etl/sources/historic_redlining/etl.py b/data/data-pipeline/data_pipeline/etl/sources/historic_redlining/etl.py new file mode 100644 index 00000000..1099bf83 --- /dev/null +++ b/data/data-pipeline/data_pipeline/etl/sources/historic_redlining/etl.py @@ -0,0 +1,72 @@ +import pandas as pd + +from data_pipeline.etl.base import ExtractTransformLoad +from data_pipeline.utils import get_module_logger +from data_pipeline.config import settings + +logger = get_module_logger(__name__) + + +class HistoricRedliningETL(ExtractTransformLoad): + def __init__(self): + self.CSV_PATH = self.DATA_PATH / "dataset" / "historic_redlining" + self.HISTORIC_REDLINING_URL = ( + settings.AWS_JUSTICE40_DATASOURCES_URL + "/HRS_2010.zip" + ) + self.HISTORIC_REDLINING_FILE_PATH = ( + self.get_tmp_path() / "HRS_2010.xlsx" + ) + + self.REDLINING_SCALAR = "Tract-level redlining score" + + self.COLUMNS_TO_KEEP = [ + self.GEOID_TRACT_FIELD_NAME, + self.REDLINING_SCALAR, + ] + self.df: pd.DataFrame + + def extract(self) -> None: + logger.info("Downloading Historic Redlining Data") + super().extract( + self.HISTORIC_REDLINING_URL, + self.get_tmp_path(), + ) + + def transform(self) -> None: + logger.info("Transforming Historic Redlining Data") + # this is obviously temporary + historic_redlining_data = pd.read_excel( + self.HISTORIC_REDLINING_FILE_PATH + ) + historic_redlining_data[self.GEOID_TRACT_FIELD_NAME] = ( + historic_redlining_data["GEOID10"].astype(str).str.zfill(11) + ) + historic_redlining_data = historic_redlining_data.rename( + columns={"HRS2010": self.REDLINING_SCALAR} + ) + + logger.info(f"{historic_redlining_data.columns}") + + # Calculate lots of different score thresholds for convenience + for threshold in [3.25, 3.5, 3.75]: + historic_redlining_data[ + f"{self.REDLINING_SCALAR} meets or exceeds {round(threshold, 2)}" + ] = (historic_redlining_data[self.REDLINING_SCALAR] >= threshold) + ## NOTE We add to columns to keep here + self.COLUMNS_TO_KEEP.append( + f"{self.REDLINING_SCALAR} meets or exceeds {round(threshold, 2)}" + ) + + self.df = historic_redlining_data + + def load(self) -> None: + logger.info("Saving Historic Redlining CSV") + # write selected states csv + self.CSV_PATH.mkdir(parents=True, exist_ok=True) + self.df[self.COLUMNS_TO_KEEP].to_csv( + self.CSV_PATH / "usa.csv", index=False + ) + + def validate(self) -> None: + logger.info("Validating Historic Redlining Data") + pass diff --git a/data/data-pipeline/data_pipeline/etl/sources/mapping_inequality/etl.py b/data/data-pipeline/data_pipeline/etl/sources/mapping_inequality/etl.py index ea2f8152..7aa7afa2 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/mapping_inequality/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/mapping_inequality/etl.py @@ -47,16 +47,21 @@ class MappingInequalityETL(ExtractTransformLoad): self.HOLC_GRADE_AND_ID_FIELD: str = "holc_id" self.CITY_INPUT_FIELD: str = "city" - self.HOLC_GRADE_D_FIELD: str = "HOLC Grade D" + self.HOLC_GRADE_D_FIELD: str = "HOLC Grade D (hazardous)" + self.HOLC_GRADE_C_FIELD: str = "HOLC Grade C (declining)" self.HOLC_GRADE_MANUAL_FIELD: str = "HOLC Grade (manually mapped)" self.HOLC_GRADE_DERIVED_FIELD: str = "HOLC Grade (derived)" self.COLUMNS_TO_KEEP = [ self.GEOID_TRACT_FIELD_NAME, + field_names.HOLC_GRADE_C_TRACT_PERCENT_FIELD, + field_names.HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD, + field_names.HOLC_GRADE_C_OR_D_TRACT_50_PERCENT_FIELD, field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD, field_names.HOLC_GRADE_D_TRACT_20_PERCENT_FIELD, field_names.HOLC_GRADE_D_TRACT_50_PERCENT_FIELD, field_names.HOLC_GRADE_D_TRACT_75_PERCENT_FIELD, + field_names.REDLINED_SHARE, ] self.df: pd.DataFrame @@ -113,34 +118,58 @@ class MappingInequalityETL(ExtractTransformLoad): how="left", ) - # Create a single field that combines the 'derived' grade D field with the - # manually mapped grade D field into a single grade D field. - merged_df[self.HOLC_GRADE_D_FIELD] = np.where( - (merged_df[self.HOLC_GRADE_DERIVED_FIELD] == "D") - | (merged_df[self.HOLC_GRADE_MANUAL_FIELD] == "D"), - True, - None, - ) + # Create a single field that combines the 'derived' grade C and D fields with the + # manually mapped grade C and D field into a single grade C and D field. + ## Note: there are no manually derived C tracts at the moment - # Start grouping by, to sum all of the grade D parts of each tract. - grouped_df = ( - merged_df.groupby( - by=[ - self.GEOID_TRACT_FIELD_NAME, - self.HOLC_GRADE_D_FIELD, - ], - # Keep the nulls, so we know the non-D proportion. - dropna=False, - )[self.TRACT_PROPORTION_FIELD] + for grade, field_name in [ + ("C", self.HOLC_GRADE_C_FIELD), + ("D", self.HOLC_GRADE_D_FIELD), + ]: + merged_df[field_name] = np.where( + (merged_df[self.HOLC_GRADE_DERIVED_FIELD] == grade) + | (merged_df[self.HOLC_GRADE_MANUAL_FIELD] == grade), + True, + None, + ) + + redlined_dataframes_list = [ + merged_df[merged_df[field].fillna(False)] + .groupby(self.GEOID_TRACT_FIELD_NAME)[self.TRACT_PROPORTION_FIELD] .sum() + .rename(new_name) + for field, new_name in [ + ( + self.HOLC_GRADE_D_FIELD, + field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD, + ), + ( + self.HOLC_GRADE_C_FIELD, + field_names.HOLC_GRADE_C_TRACT_PERCENT_FIELD, + ), + ] + ] + + # Group by tract ID to get tract proportions of just C or just D + # This produces a single row per tract + grouped_df = ( + pd.concat( + redlined_dataframes_list, + axis=1, + ) + .fillna(0) .reset_index() ) - # Create a field that is only the percent that is grade D. - grouped_df[field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD] = np.where( - grouped_df[self.HOLC_GRADE_D_FIELD], - grouped_df[self.TRACT_PROPORTION_FIELD], - 0, + grouped_df[ + field_names.HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD + ] = grouped_df[ + [ + field_names.HOLC_GRADE_C_TRACT_PERCENT_FIELD, + field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD, + ] + ].sum( + axis=1 ) # Calculate some specific threshold cutoffs, for convenience. @@ -154,15 +183,14 @@ class MappingInequalityETL(ExtractTransformLoad): grouped_df[field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD] > 0.75 ) - # Drop the non-True values of `self.HOLC_GRADE_D_FIELD` -- we only - # want one row per tract for future joins. - # Note this means not all tracts will be in this data. - # Note: this singleton comparison warning may be a pylint bug: - # https://stackoverflow.com/questions/51657715/pylint-pandas-comparison-to-true-should-be-just-expr-or-expr-is-true-sin#comment90876517_51657715 - # pylint: disable=singleton-comparison - grouped_df = grouped_df[ - grouped_df[self.HOLC_GRADE_D_FIELD] == True # noqa: E712 - ] + grouped_df[field_names.HOLC_GRADE_C_OR_D_TRACT_50_PERCENT_FIELD] = ( + grouped_df[field_names.HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD] > 0.5 + ) + + # Create the indicator we will use + grouped_df[field_names.REDLINED_SHARE] = ( + grouped_df[field_names.HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD] > 0.5 + ) & (grouped_df[field_names.HOLC_GRADE_D_TRACT_PERCENT_FIELD] > 0) # Sort for convenience. grouped_df.sort_values(by=self.GEOID_TRACT_FIELD_NAME, inplace=True) diff --git a/data/data-pipeline/data_pipeline/score/field_names.py b/data/data-pipeline/data_pipeline/score/field_names.py index 39153e02..3254f08e 100644 --- a/data/data-pipeline/data_pipeline/score/field_names.py +++ b/data/data-pipeline/data_pipeline/score/field_names.py @@ -57,7 +57,7 @@ M_WORKFORCE = "Workforce Factor (Definition M)" M_NON_WORKFORCE = "Any Non-Workforce Factor (Definition M)" # Definition Narwhal fields -SCORE_N = "Definition N" +SCORE_N = "Definition N (communities)" SCORE_N_COMMUNITIES = "Definition N (communities)" N_CLIMATE = "Climate Factor (Definition N)" N_ENERGY = "Energy Factor (Definition N)" @@ -303,7 +303,17 @@ EJSCREEN_AREAS_OF_CONCERN_STATE_95TH_PERCENTILE_COMMUNITIES_FIELD = ( "EJSCREEN Areas of Concern, State, 95th percentile (communities)" ) # Mapping inequality data. +REDLINED_SHARE: str = ( + "Redlined share: tract had redlining and was more than 50% Grade C or D" +) HOLC_GRADE_D_TRACT_PERCENT_FIELD: str = "Percent of tract that is HOLC Grade D" +HOLC_GRADE_C_TRACT_PERCENT_FIELD: str = "Percent of tract that is HOLC Grade C" +HOLC_GRADE_C_OR_D_TRACT_PERCENT_FIELD: str = ( + "Percent of tract that is HOLC Grade C or HOLC Grade D" +) +HOLC_GRADE_C_OR_D_TRACT_50_PERCENT_FIELD: str = ( + "Tract is more than 50% Grade C or D" +) HOLC_GRADE_D_TRACT_20_PERCENT_FIELD: str = "Tract is >20% HOLC Grade D" HOLC_GRADE_D_TRACT_50_PERCENT_FIELD: str = "Tract is >50% HOLC Grade D" HOLC_GRADE_D_TRACT_75_PERCENT_FIELD: str = "Tract is >75% HOLC Grade D" @@ -316,7 +326,7 @@ MICHIGAN_EJSCREEN_PRIORITY_COMMUNITY_FIELD: str = ( ) # CDC SVI INDEX percentile fields -CDC_SVI_INDEX_SE_THEME_FIELD: str = "SVI - Socioeconomic Index" +CDC_SVI_INDEX_SE_THEME_FIELD: str = "SVI - Social Vulnerability Index" CDC_SVI_INDEX_HOUSEHOLD_THEME_COMPOSITION_FIELD: str = ( "SVI - Household Composition Index" ) @@ -691,5 +701,14 @@ MAPPING_FOR_EJ_PRIORITY_COMMUNITY_FIELD = ( "Mapping for Environmental Justice Priority Community" ) +# Historic Redlining Score +HISTORIC_REDLINING_SCORE_EXCEEDED = ( + "Tract-level redlining score meets or exceeds 3.25" +) + +HISTORIC_REDLINING_SCORE_EXCEEDED_LOW_INCOME_FIELD = ( + "Tract-level redlining score meets or exceeds 3.25 and is low income" +) + # End of names for individual factors being exceeded #### diff --git a/data/data-pipeline/data_pipeline/score/score_narwhal.py b/data/data-pipeline/data_pipeline/score/score_narwhal.py index 0bde80bb..2958f788 100644 --- a/data/data-pipeline/data_pipeline/score/score_narwhal.py +++ b/data/data-pipeline/data_pipeline/score/score_narwhal.py @@ -1,4 +1,5 @@ from typing import Tuple +from attr import field import numpy as np import pandas as pd @@ -308,11 +309,22 @@ class ScoreNarwhal(Score): # poverty level and has a low percent of higher ed students. # Source: Census's American Community Survey + ## Additionally, we look to see if HISTORIC_REDLINING_SCORE_EXCEEDED is True and the tract is also low income + housing_eligibility_columns = [ field_names.LEAD_PAINT_MEDIAN_HOUSE_VALUE_LOW_INCOME_FIELD, field_names.HOUSING_BURDEN_LOW_INCOME_FIELD, + field_names.HISTORIC_REDLINING_SCORE_EXCEEDED_LOW_INCOME_FIELD, ] + # design question -- should read in scalar with threshold here instead? + self.df[ + field_names.HISTORIC_REDLINING_SCORE_EXCEEDED_LOW_INCOME_FIELD + ] = ( + self.df[field_names.HISTORIC_REDLINING_SCORE_EXCEEDED] + & self.df[field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED] + ) + self.df[field_names.LEAD_PAINT_PROXY_PCTILE_THRESHOLD] = ( self.df[ field_names.LEAD_PAINT_FIELD @@ -804,5 +816,8 @@ class ScoreNarwhal(Score): ] self.df[field_names.CATEGORY_COUNT] = self.df[factors].sum(axis=1) self.df[field_names.SCORE_N_COMMUNITIES] = self.df[factors].any(axis=1) + self.df[ + field_names.SCORE_N_COMMUNITIES + field_names.PERCENTILE_FIELD_SUFFIX + ] = self.df[field_names.SCORE_N_COMMUNITIES].astype(int) return self.df