diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index 88a5a763..f19f0976 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -1,11 +1,13 @@ -import collections import functools -from pathlib import Path import pandas as pd from data_pipeline.etl.base import ExtractTransformLoad +from data_pipeline.score.score_runner import ScoreRunner +from data_pipeline.score import field_names +from data_pipeline.etl.score import constants + from data_pipeline.utils import get_module_logger -from data_pipeline.etl.score.score_calculator import ScoreCalculator + logger = get_module_logger(__name__) @@ -13,79 +15,6 @@ logger = get_module_logger(__name__) class ScoreETL(ExtractTransformLoad): def __init__(self): # Define some global parameters - self.BUCKET_SOCIOECONOMIC: str = "Socioeconomic Factors" - self.BUCKET_SENSITIVE: str = "Sensitive populations" - self.BUCKET_ENVIRONMENTAL: str = "Environmental effects" - self.BUCKET_EXPOSURES: str = "Exposures" - self.BUCKETS: str = [ - self.BUCKET_SOCIOECONOMIC, - self.BUCKET_SENSITIVE, - self.BUCKET_ENVIRONMENTAL, - self.BUCKET_EXPOSURES, - ] - - # A few specific field names - # TODO: clean this up, I name some fields but not others. - self.UNEMPLOYED_FIELD_NAME: str = "Unemployed civilians (percent)" - self.LINGUISTIC_ISOLATION_FIELD_NAME: str = ( - "Linguistic isolation (percent)" - ) - self.HOUSING_BURDEN_FIELD_NAME: str = "Housing burden (percent)" - self.POVERTY_FIELD_NAME: str = ( - "Poverty (Less than 200% of federal poverty line)" - ) - self.HIGH_SCHOOL_FIELD_NAME: str = "Percent individuals age 25 or over with less than high school degree" - self.STATE_MEDIAN_INCOME_FIELD_NAME: str = ( - "Median household income (State; 2019 inflation-adjusted dollars)" - ) - self.MEDIAN_INCOME_FIELD_NAME: str = ( - "Median household income in the past 12 months" - ) - self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME: str = ( - "Median household income (% of state median household income)" - ) - self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME: str = ( - "Median household income (% of AMI)" - ) - self.AMI_FIELD_NAME: str = "Area Median Income (State or metropolitan)" - - # Note: these variable names are slightly different (missing the word `PERCENT`) than those in the source ETL to avoid pylint's duplicate - # code error. - LMB - self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME: str = ( - "Percent of individuals < 100% Federal Poverty Line" - ) - self.POVERTY_LESS_THAN_150_FPL_FIELD_NAME: str = ( - "Percent of individuals < 150% Federal Poverty Line" - ) - self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME: str = ( - "Percent of individuals < 200% Federal Poverty Line" - ) - - # CDC life expectancy - self.LIFE_EXPECTANCY_FIELD_NAME = "Life expectancy (years)" - - # DOE energy burden - self.ENERGY_BURDEN_FIELD_NAME = "Energy burden" - - # FEMA Risk Index - self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME = ( - "FEMA Risk Index Expected Annual Loss Score" - ) - - # There's another aggregation level (a second level of "buckets"). - self.AGGREGATION_POLLUTION: str = "Pollution Burden" - self.AGGREGATION_POPULATION: str = "Population Characteristics" - - self.PERCENTILE_FIELD_SUFFIX: str = " (percentile)" - self.MIN_MAX_FIELD_SUFFIX: str = " (min-max normalized)" - - self.SCORE_CSV_PATH: Path = self.DATA_PATH / "score" / "csv" / "full" - - # Urban Rural Map - self.URBAN_HERUISTIC_FIELD_NAME = "Urban Heuristic Flag" - - # Persistent poverty - self.PERSISTENT_POVERTY_FIELD = "Persistent Poverty Census Tract" # dataframes self.df: pd.DataFrame @@ -101,233 +30,45 @@ class ScoreETL(ExtractTransformLoad): self.geocorr_urban_rural_df: pd.DataFrame self.persistent_poverty_df: pd.DataFrame - def data_sets(self) -> list: - # Define a named tuple that will be used for each data set input. - DataSet = collections.namedtuple( - typename="DataSet", - field_names=["input_field", "renamed_field", "bucket"], - ) - - return [ - # The following data sets have `bucket=None`, because it's not used in the bucket based score ("Score C"). - DataSet( - input_field=self.GEOID_FIELD_NAME, - # Use the name `GEOID10` to enable geoplatform.gov's workflow. - renamed_field=self.GEOID_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.HOUSING_BURDEN_FIELD_NAME, - renamed_field=self.HOUSING_BURDEN_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field="ACSTOTPOP", - renamed_field="Total population", - bucket=None, - ), - DataSet( - input_field=self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME, - renamed_field=self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field="Current asthma among adults aged >=18 years", - renamed_field="Current asthma among adults aged >=18 years", - bucket=None, - ), - DataSet( - input_field="Coronary heart disease among adults aged >=18 years", - renamed_field="Coronary heart disease among adults aged >=18 years", - bucket=None, - ), - DataSet( - input_field="Cancer (excluding skin cancer) among adults aged >=18 years", - renamed_field="Cancer (excluding skin cancer) among adults aged >=18 years", - bucket=None, - ), - DataSet( - input_field="Current lack of health insurance among adults aged 18-64 years", - renamed_field="Current lack of health insurance among adults aged 18-64 years", - bucket=None, - ), - DataSet( - input_field="Diagnosed diabetes among adults aged >=18 years", - renamed_field="Diagnosed diabetes among adults aged >=18 years", - bucket=None, - ), - DataSet( - input_field="Physical health not good for >=14 days among adults aged >=18 years", - renamed_field="Physical health not good for >=14 days among adults aged >=18 years", - bucket=None, - ), - DataSet( - input_field=self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME, - renamed_field=self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.POVERTY_LESS_THAN_150_FPL_FIELD_NAME, - renamed_field=self.POVERTY_LESS_THAN_150_FPL_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME, - renamed_field=self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.AMI_FIELD_NAME, - renamed_field=self.AMI_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME, - renamed_field=self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.MEDIAN_INCOME_FIELD_NAME, - renamed_field=self.MEDIAN_INCOME_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.LIFE_EXPECTANCY_FIELD_NAME, - renamed_field=self.LIFE_EXPECTANCY_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.ENERGY_BURDEN_FIELD_NAME, - renamed_field=self.ENERGY_BURDEN_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, - renamed_field=self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.URBAN_HERUISTIC_FIELD_NAME, - renamed_field=self.URBAN_HERUISTIC_FIELD_NAME, - bucket=None, - ), - DataSet( - input_field=self.PERSISTENT_POVERTY_FIELD, - renamed_field=self.PERSISTENT_POVERTY_FIELD, - bucket=None, - ), - # The following data sets have buckets, because they're used in Score C - DataSet( - input_field="CANCER", - renamed_field="Air toxics cancer risk", - bucket=self.BUCKET_EXPOSURES, - ), - DataSet( - input_field="RESP", - renamed_field="Respiratory hazard index", - bucket=self.BUCKET_EXPOSURES, - ), - DataSet( - input_field="DSLPM", - renamed_field="Diesel particulate matter", - bucket=self.BUCKET_EXPOSURES, - ), - DataSet( - input_field="PM25", - renamed_field="Particulate matter (PM2.5)", - bucket=self.BUCKET_EXPOSURES, - ), - DataSet( - input_field="OZONE", - renamed_field="Ozone", - bucket=self.BUCKET_EXPOSURES, - ), - DataSet( - input_field="PTRAF", - renamed_field="Traffic proximity and volume", - bucket=self.BUCKET_EXPOSURES, - ), - DataSet( - input_field="PRMP", - renamed_field="Proximity to RMP sites", - bucket=self.BUCKET_ENVIRONMENTAL, - ), - DataSet( - input_field="PTSDF", - renamed_field="Proximity to TSDF sites", - bucket=self.BUCKET_ENVIRONMENTAL, - ), - DataSet( - input_field="PNPL", - renamed_field="Proximity to NPL sites", - bucket=self.BUCKET_ENVIRONMENTAL, - ), - DataSet( - input_field="PWDIS", - renamed_field="Wastewater discharge", - bucket=self.BUCKET_ENVIRONMENTAL, - ), - DataSet( - input_field="PRE1960PCT", - renamed_field="Percent pre-1960s housing (lead paint indicator)", - bucket=self.BUCKET_ENVIRONMENTAL, - ), - DataSet( - input_field="UNDER5PCT", - renamed_field="Individuals under 5 years old", - bucket=self.BUCKET_SENSITIVE, - ), - DataSet( - input_field="OVER64PCT", - renamed_field="Individuals over 64 years old", - bucket=self.BUCKET_SENSITIVE, - ), - DataSet( - input_field=self.LINGUISTIC_ISOLATION_FIELD_NAME, - renamed_field=self.LINGUISTIC_ISOLATION_FIELD_NAME, - bucket=self.BUCKET_SENSITIVE, - ), - DataSet( - input_field="LINGISOPCT", - renamed_field="Percent of households in linguistic isolation", - bucket=self.BUCKET_SOCIOECONOMIC, - ), - DataSet( - input_field="LOWINCPCT", - renamed_field=self.POVERTY_FIELD_NAME, - bucket=self.BUCKET_SOCIOECONOMIC, - ), - DataSet( - input_field="LESSHSPCT", - renamed_field=self.HIGH_SCHOOL_FIELD_NAME, - bucket=self.BUCKET_SOCIOECONOMIC, - ), - DataSet( - input_field=self.UNEMPLOYED_FIELD_NAME, - renamed_field=self.UNEMPLOYED_FIELD_NAME, - bucket=self.BUCKET_SOCIOECONOMIC, - ), - DataSet( - input_field="ht_ami", - renamed_field="Housing + Transportation Costs % Income for the Regional Typical Household", - bucket=self.BUCKET_SOCIOECONOMIC, - ), - ] - def extract(self) -> None: logger.info("Loading data sets from disk.") # EJSCreen csv Load - ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv" + ejscreen_csv = ( + constants.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv" + ) self.ejscreen_df = pd.read_csv( ejscreen_csv, dtype={"ID": "string"}, low_memory=False ) + # TODO move to EJScreen ETL self.ejscreen_df.rename( - columns={"ID": self.GEOID_FIELD_NAME}, inplace=True + columns={ + "ID": self.GEOID_FIELD_NAME, + "ACSTOTPOP": field_names.TOTAL_POP_FIELD, + "CANCER": field_names.AIR_TOXICS_CANCER_RISK_FIELD, + "RESP": field_names.RESPITORY_HAZARD_FIELD, + "DSLPM": field_names.DIESEL_FIELD, + "PM25": field_names.PM25_FIELD, + "OZONE": field_names.OZONE_FIELD, + "PTRAF": field_names.TRAFFIC_FIELD, + "PRMP": field_names.RMP_FIELD, + "PTSDF": field_names.TSDF_FIELD, + "PNPL": field_names.NPL_FIELD, + "PWDIS": field_names.WASTEWATER_FIELD, + "LINGISOPCT": field_names.HOUSEHOLDS_LINGUISTIC_ISO_FIELD, + "LOWINCPCT": field_names.POVERTY_FIELD, + "LESSHSPCT": field_names.HIGH_SCHOOL_ED_FIELD, + "OVER64PCT": field_names.OVER_64_FIELD, + "UNDER5PCT": field_names.UNDER_5_FIELD, + "PRE1960PCT": field_names.LEAD_PAINT_FIELD, + }, + inplace=True, ) # Load census data - census_csv = self.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv" + census_csv = ( + constants.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv" + ) self.census_df = pd.read_csv( census_csv, dtype={self.GEOID_FIELD_NAME: "string"}, @@ -336,7 +77,7 @@ class ScoreETL(ExtractTransformLoad): # Load housing and transportation data housing_and_transportation_index_csv = ( - self.DATA_PATH + constants.DATA_PATH / "dataset" / "housing_and_transportation_index" / "usa.csv" @@ -346,9 +87,15 @@ class ScoreETL(ExtractTransformLoad): dtype={self.GEOID_FIELD_NAME: "string"}, low_memory=False, ) + # TODO move to HT Index ETL + self.housing_and_transportation_df.rename( + columns={"ht_ami": field_names.HT_INDEX_FIELD}, inplace=True + ) # Load HUD housing data - hud_housing_csv = self.DATA_PATH / "dataset" / "hud_housing" / "usa.csv" + hud_housing_csv = ( + constants.DATA_PATH / "dataset" / "hud_housing" / "usa.csv" + ) self.hud_housing_df = pd.read_csv( hud_housing_csv, dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, @@ -356,7 +103,9 @@ class ScoreETL(ExtractTransformLoad): ) # Load CDC Places data - cdc_places_csv = self.DATA_PATH / "dataset" / "cdc_places" / "usa.csv" + cdc_places_csv = ( + constants.DATA_PATH / "dataset" / "cdc_places" / "usa.csv" + ) self.cdc_places_df = pd.read_csv( cdc_places_csv, dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, @@ -365,7 +114,7 @@ class ScoreETL(ExtractTransformLoad): # Load census AMI data census_acs_median_incomes_csv = ( - self.DATA_PATH + constants.DATA_PATH / "dataset" / "census_acs_median_income_2019" / "usa.csv" @@ -378,7 +127,7 @@ class ScoreETL(ExtractTransformLoad): # Load CDC life expectancy data cdc_life_expectancy_csv = ( - self.DATA_PATH / "dataset" / "cdc_life_expectancy" / "usa.csv" + constants.DATA_PATH / "dataset" / "cdc_life_expectancy" / "usa.csv" ) self.cdc_life_expectancy_df = pd.read_csv( cdc_life_expectancy_csv, @@ -388,7 +137,7 @@ class ScoreETL(ExtractTransformLoad): # Load DOE energy burden data doe_energy_burden_csv = ( - self.DATA_PATH / "dataset" / "doe_energy_burden" / "usa.csv" + constants.DATA_PATH / "dataset" / "doe_energy_burden" / "usa.csv" ) self.doe_energy_burden_df = pd.read_csv( doe_energy_burden_csv, @@ -398,7 +147,10 @@ class ScoreETL(ExtractTransformLoad): # Load FEMA national risk index data national_risk_index_csv = ( - self.DATA_PATH / "dataset" / "national_risk_index_2020" / "usa.csv" + constants.DATA_PATH + / "dataset" + / "national_risk_index_2020" + / "usa.csv" ) self.national_risk_index_df = pd.read_csv( national_risk_index_csv, @@ -408,7 +160,7 @@ class ScoreETL(ExtractTransformLoad): # Load GeoCorr Urban Rural Map geocorr_urban_rural_csv = ( - self.DATA_PATH / "dataset" / "geocorr" / "usa.csv" + constants.DATA_PATH / "dataset" / "geocorr" / "usa.csv" ) self.geocorr_urban_rural_df = pd.read_csv( geocorr_urban_rural_csv, @@ -418,7 +170,7 @@ class ScoreETL(ExtractTransformLoad): # Load persistent poverty persistent_poverty_csv = ( - self.DATA_PATH / "dataset" / "persistent_poverty" / "usa.csv" + constants.DATA_PATH / "dataset" / "persistent_poverty" / "usa.csv" ) self.persistent_poverty_df = pd.read_csv( persistent_poverty_csv, @@ -467,239 +219,8 @@ class ScoreETL(ExtractTransformLoad): ) return census_tract_df - def _add_score_a(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info("Adding Score A") - df["Score A"] = df[ - [ - "Poverty (Less than 200% of federal poverty line) (percentile)", - "Percent individuals age 25 or over with less than high school degree (percentile)", - ] - ].mean(axis=1) - return df - - def _add_score_b(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info("Adding Score B") - df["Score B"] = ( - self.df[ - "Poverty (Less than 200% of federal poverty line) (percentile)" - ] - * self.df[ - "Percent individuals age 25 or over with less than high school degree (percentile)" - ] - ) - return df - - def _add_score_c(self, df: pd.DataFrame, data_sets: list) -> pd.DataFrame: - logger.info("Adding Score C") - # Average all the percentile values in each bucket into a single score for each of the four buckets. - for bucket in self.BUCKETS: - fields_in_bucket = [ - f"{data_set.renamed_field}{self.PERCENTILE_FIELD_SUFFIX}" - for data_set in data_sets - if data_set.bucket == bucket - ] - df[f"{bucket}"] = df[fields_in_bucket].mean(axis=1) - - # Combine the score from the two Exposures and Environmental Effects buckets - # into a single score called "Pollution Burden". - # The math for this score is: - # (1.0 * Exposures Score + 0.5 * Environment Effects score) / 1.5. - df[self.AGGREGATION_POLLUTION] = ( - 1.0 * df[f"{self.BUCKET_EXPOSURES}"] - + 0.5 * df[f"{self.BUCKET_ENVIRONMENTAL}"] - ) / 1.5 - - # Average the score from the two Sensitive populations and - # Socioeconomic factors buckets into a single score called - # "Population Characteristics". - df[self.AGGREGATION_POPULATION] = df[ - [f"{self.BUCKET_SENSITIVE}", f"{self.BUCKET_SOCIOECONOMIC}"] - ].mean(axis=1) - - # Multiply the "Pollution Burden" score and the "Population Characteristics" - # together to produce the cumulative impact score. - df["Score C"] = ( - df[self.AGGREGATION_POLLUTION] * df[self.AGGREGATION_POPULATION] - ) - return df - - def _add_scores_d_e(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info("Adding Scores D and E") - fields_to_use_in_score = [ - self.UNEMPLOYED_FIELD_NAME, - self.LINGUISTIC_ISOLATION_FIELD_NAME, - self.HOUSING_BURDEN_FIELD_NAME, - self.POVERTY_FIELD_NAME, - self.HIGH_SCHOOL_FIELD_NAME, - ] - - fields_min_max = [ - f"{field}{self.MIN_MAX_FIELD_SUFFIX}" - for field in fields_to_use_in_score - ] - fields_percentile = [ - f"{field}{self.PERCENTILE_FIELD_SUFFIX}" - for field in fields_to_use_in_score - ] - - # Calculate "Score D", which uses min-max normalization - # and calculate "Score E", which uses percentile normalization for the same fields - df["Score D"] = self.df[fields_min_max].mean(axis=1) - df["Score E"] = self.df[fields_percentile].mean(axis=1) - return df - - def _add_score_percentiles(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info("Adding Score Percentiles") - for score_field in [ - "Score A", - "Score B", - "Score C", - "Score D", - "Score E", - "Poverty (Less than 200% of federal poverty line)", - ]: - df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = df[ - score_field - ].rank(pct=True) - - for threshold in [0.25, 0.3, 0.35, 0.4]: - fraction_converted_to_percent = int(100 * threshold) - df[ - f"{score_field} (top {fraction_converted_to_percent}th percentile)" - ] = ( - df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] - >= 1 - threshold - ) - return df - - # TODO Make variables and constants clearer (meaning and type) - def _add_score_f(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info("Adding Score F") - ami_and_high_school_field_name = "Low AMI, Low HS graduation" - meets_socio_field_name = "Meets socioeconomic criteria" - meets_burden_field_name = "Meets burden criteria" - - df[ami_and_high_school_field_name] = ( - df[self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME] < 0.80 - ) & (df[self.HIGH_SCHOOL_FIELD_NAME] > 0.2) - - df[meets_socio_field_name] = ( - df[ami_and_high_school_field_name] - | (df[self.POVERTY_FIELD_NAME] > 0.40) - | (df[self.LINGUISTIC_ISOLATION_FIELD_NAME] > 0.10) - | (df[self.HIGH_SCHOOL_FIELD_NAME] > 0.4) - ) - - df[meets_burden_field_name] = ( - (df["Particulate matter (PM2.5) (percentile)"] > 0.9) - | (df["Respiratory hazard index (percentile)"] > 0.9) - | (df["Traffic proximity and volume (percentile)"] > 0.9) - | ( - df[ - "Percent pre-1960s housing (lead paint indicator) (percentile)" - ] - > 0.9 - ) - | (df["Proximity to RMP sites (percentile)"] > 0.9) - | ( - df["Current asthma among adults aged >=18 years (percentile)"] - > 0.9 - ) - | ( - df[ - "Coronary heart disease among adults aged >=18 years (percentile)" - ] - > 0.9 - ) - | ( - df[ - "Cancer (excluding skin cancer) among adults aged >=18 years (percentile)" - ] - > 0.9 - ) - # | ( - # self.df[ - # "Current lack of health insurance among adults aged 18-64 years (percentile)" - # ] - # > 0.9 - # ) - | ( - df[ - "Diagnosed diabetes among adults aged >=18 years (percentile)" - ] - > 0.9 - ) - # | ( - # self.df[ - # "Physical health not good for >=14 days among adults aged >=18 years (percentile)" - # ] - # > 0.9 - # ) - ) - - df["Score F (communities)"] = ( - df[meets_socio_field_name] & df[meets_burden_field_name] - ) - return df - - def _add_score_g_k(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info("Adding Score G through K") - - high_school_cutoff_threshold = 0.05 - high_school_cutoff_threshold_2 = 0.06 - - # Score G is now modified NMTC - df["Score G (communities)"] = ( - (df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8) - & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold) - ) | ( - (df[self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME] > 0.20) - & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold) - ) - df["Score G"] = df["Score G (communities)"].astype(int) - df["Score G (percentile)"] = df["Score G"] - - df["Score H (communities)"] = ( - (df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8) - & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold_2) - ) | ( - (df[self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME] > 0.40) - & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold_2) - ) - df["Score H"] = df["Score H (communities)"].astype(int) - - df["Score I (communities)"] = ( - (df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.7) - & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold) - ) | ( - (df[self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME] > 0.50) - & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold) - ) - df["Score I"] = df["Score I (communities)"].astype(int) - df["Score I (percentile)"] = df["Score I"] - - df["NMTC (communities)"] = ( - (df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8) - ) | (df[self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME] > 0.20) - - df["Score K (communities)"] = ( - (df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8) - & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold_2) - ) | ( - (df[self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME] > 0.20) - & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold_2) - ) - - return df - - def _add_definition_l_factors(self, df: pd.DataFrame) -> pd.DataFrame: - logger.info("Adding Definition L and factors") - calc = ScoreCalculator(df=df) - df = calc.add_definition_l_factors() - return df - # TODO Move a lot of this to the ETL part of the pipeline - def _prepare_initial_df(self, data_sets: list) -> pd.DataFrame: + def _prepare_initial_df(self) -> pd.DataFrame: logger.info("Preparing initial dataframe") # Join all the data sources that use census block groups @@ -741,120 +262,106 @@ class ScoreETL(ExtractTransformLoad): # Calculate median income variables. # First, calculate the income of the block group as a fraction of the state income. - df[self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME] = ( - df[self.MEDIAN_INCOME_FIELD_NAME] - / df[self.STATE_MEDIAN_INCOME_FIELD_NAME] + df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD] = ( + df[field_names.MEDIAN_INCOME_FIELD] + / df[field_names.STATE_MEDIAN_INCOME_FIELD] ) # Calculate the income of the block group as a fraction of the AMI (either state or metropolitan, depending on reference). - df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] = ( - df[self.MEDIAN_INCOME_FIELD_NAME] / df[self.AMI_FIELD_NAME] + df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD] = ( + df[field_names.MEDIAN_INCOME_FIELD] / df[field_names.AMI_FIELD] ) - # TODO Refactor to no longer use the data_sets list and do all renaming in ETL step - # Rename columns: - renaming_dict = { - data_set.input_field: data_set.renamed_field - for data_set in data_sets - } - - df.rename( - columns=renaming_dict, - inplace=True, - errors="raise", - ) - - columns_to_keep = [data_set.renamed_field for data_set in data_sets] + numeric_columns = [ + field_names.HOUSING_BURDEN_FIELD, + field_names.TOTAL_POP_FIELD, + field_names.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD, + field_names.ASTHMA_FIELD, + field_names.HEART_DISEASE_FIELD, + field_names.CANCER_FIELD, + field_names.HEALTH_INSURANCE_FIELD, + field_names.DIABETES_FIELD, + field_names.PHYS_HEALTH_NOT_GOOD_FIELD, + field_names.POVERTY_LESS_THAN_100_FPL_FIELD, + field_names.POVERTY_LESS_THAN_150_FPL_FIELD, + field_names.POVERTY_LESS_THAN_200_FPL_FIELD, + field_names.AMI_FIELD, + field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD, + field_names.MEDIAN_INCOME_FIELD, + field_names.LIFE_EXPECTANCY_FIELD, + field_names.ENERGY_BURDEN_FIELD, + field_names.FEMA_RISK_FIELD, + field_names.URBAN_HERUISTIC_FIELD, + field_names.AIR_TOXICS_CANCER_RISK_FIELD, + field_names.RESPITORY_HAZARD_FIELD, + field_names.DIESEL_FIELD, + field_names.PM25_FIELD, + field_names.OZONE_FIELD, + field_names.TRAFFIC_FIELD, + field_names.RMP_FIELD, + field_names.TSDF_FIELD, + field_names.NPL_FIELD, + field_names.WASTEWATER_FIELD, + field_names.LEAD_PAINT_FIELD, + field_names.UNDER_5_FIELD, + field_names.OVER_64_FIELD, + field_names.LINGUISTIC_ISO_FIELD, + field_names.HOUSEHOLDS_LINGUISTIC_ISO_FIELD, + field_names.POVERTY_FIELD, + field_names.HIGH_SCHOOL_ED_FIELD, + field_names.UNEMPLOYMENT_FIELD, + field_names.HT_INDEX_FIELD, + ] + non_numeric_columns = [ + self.GEOID_FIELD_NAME, + field_names.PERSISTENT_POVERTY_FIELD, + ] + columns_to_keep = non_numeric_columns + numeric_columns df = df[columns_to_keep] - # Convert all columns to numeric. - # TODO do this at the same time as calculating percentiles in future refactor - for data_set in data_sets: - # Skip GEOID_FIELD_NAME, because it's a string. - # Skip `PERSISTENT_POVERTY_FIELD` because it's a straight pass-through. - if data_set.renamed_field in ( - self.GEOID_FIELD_NAME, - self.PERSISTENT_POVERTY_FIELD, - ): - continue - - df[data_set.renamed_field] = pd.to_numeric( - df[data_set.renamed_field] + # Convert all columns to numeric and do math + for col in numeric_columns: + df[col] = pd.to_numeric(df[col]) + # Calculate percentiles + df[f"{col}{field_names.PERCENTILE_FIELD_SUFFIX}"] = df[col].rank( + pct=True ) - # calculate percentiles - for data_set in data_sets: - df[f"{data_set.renamed_field}{self.PERCENTILE_FIELD_SUFFIX}"] = df[ - data_set.renamed_field - ].rank(pct=True) + # Min-max normalization: + # ( + # Observed value + # - minimum of all values + # ) + # divided by + # ( + # Maximum of all values + # - minimum of all values + # ) + min_value = df[col].min(skipna=True) - # Do some math: - # ( - # Observed value - # - minimum of all values - # ) - # divided by - # ( - # Maximum of all values - # - minimum of all values - # ) - for data_set in data_sets: - # Skip GEOID_FIELD_NAME, because it's a string. - if data_set.renamed_field == self.GEOID_FIELD_NAME: - continue - - min_value = df[data_set.renamed_field].min(skipna=True) - - max_value = df[data_set.renamed_field].max(skipna=True) + max_value = df[col].max(skipna=True) logger.info( - f"For data set {data_set.renamed_field}, the min value is {min_value} and the max value is {max_value}." + f"For data set {col}, the min value is {min_value} and the max value is {max_value}." ) - df[f"{data_set.renamed_field}{self.MIN_MAX_FIELD_SUFFIX}"] = ( - df[data_set.renamed_field] - min_value + df[f"{col}{field_names.MIN_MAX_FIELD_SUFFIX}"] = ( + df[col] - min_value ) / (max_value - min_value) return df def transform(self) -> None: - ## IMPORTANT: THIS METHOD IS CLOSE TO THE LIMIT OF STATEMENTS - logger.info("Transforming Score Data") - # get data sets list - data_sets = self.data_sets() - # prepare the df with the right CBG/tract IDs, column names/types, and percentiles - self.df = self._prepare_initial_df(data_sets) + self.df = self._prepare_initial_df() - # Calculate score "A" - self.df = self._add_score_a(self.df) - - # Calculate score "B" - self.df = self._add_score_b(self.df) - - # Calculate score "C" - "CalEnviroScreen for the US" score - self.df = self._add_score_c(self.df, data_sets) - - # Calculate scores "D" and "E" - self.df = self._add_scores_d_e(self.df) - - # Create percentiles for the scores - self.df = self._add_score_percentiles(self.df) - - # Now for binary (non index) scores. - # Calculate "Score F", which uses "either/or" thresholds. - self.df = self._add_score_f(self.df) - - # Calculate "Score G through K", which uses AMI and poverty. - self.df = self._add_score_g_k(self.df) - - # Calculate Definition L and its factors - self.df = self._add_definition_l_factors(self.df) + # calculate scores + self.df = ScoreRunner(df=self.df).calculate_scores() def load(self) -> None: logger.info("Saving Score CSV") - self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True) + constants.DATA_SCORE_CSV_FULL_DIR.mkdir(parents=True, exist_ok=True) - self.df.to_csv(self.SCORE_CSV_PATH / "usa.csv", index=False) + self.df.to_csv(constants.DATA_SCORE_CSV_FULL_FILE_PATH, index=False) diff --git a/data/data-pipeline/data_pipeline/ipython/scoring_comparison.ipynb b/data/data-pipeline/data_pipeline/ipython/scoring_comparison.ipynb index 69b8e166..3a97dc2e 100644 --- a/data/data-pipeline/data_pipeline/ipython/scoring_comparison.ipynb +++ b/data/data-pipeline/data_pipeline/ipython/scoring_comparison.ipynb @@ -279,7 +279,6 @@ "\n", "# Define the indices used for CEJST scoring (`census_block_group_indices`) as well as comparison\n", "# (`census_tract_indices`).\n", - "\n", "definition_l_factors = [\n", " \"Climate Factor (Definition L)\",\n", " \"Energy Factor (Definition L)\",\n", @@ -1496,7 +1495,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -1510,7 +1509,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.6" + "version": "3.9.5" } }, "nbformat": 4, diff --git a/data/data-pipeline/data_pipeline/score/field_names.py b/data/data-pipeline/data_pipeline/score/field_names.py new file mode 100644 index 00000000..c2ceb0c3 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/field_names.py @@ -0,0 +1,155 @@ +# Suffixes +PERCENTILE_FIELD_SUFFIX = " (percentile)" +MIN_MAX_FIELD_SUFFIX = " (min-max normalized)" + +# Score file field names +SCORE_A = "Score A" +SCORE_B = "Score B" +SCORE_C = "Score C" +C_SOCIOECONOMIC = "Socioeconomic Factors" +C_SENSITIVE = "Sensitive populations" +C_ENVIRONMENTAL = "Environmental effects" +C_EXPOSURES = "Exposures" +SCORE_D = "Score D" +SCORE_E = "Score E" +SCORE_F_COMMUNITIES = "Score F (communities)" +SCORE_G = "Score G" +SCORE_G_COMMUNITIES = "Score G (communities)" +SCORE_H = "Score H" +SCORE_H_COMMUNITIES = "Score H (communities)" +SCORE_I = "Score I" +SCORE_I_COMMUNITIES = "Score I (communities)" +SCORE_K = "NMTC (communities)" +SCORE_K_COMMUNITIES = "Score K (communities)" +SCORE_L_COMMUNITIES = "Definition L (communities)" +L_CLIMATE = "Climate Factor (Definition L)" +L_ENERGY = "Energy Factor (Definition L)" +L_TRANSPORTATION = "Transportation Factor (Definition L)" +L_HOUSING = "Housing Factor (Definition L)" +L_POLLUTION = "Pollution Factor (Definition L)" +L_WATER = "Water Factor (Definition L)" +L_HEALTH = "Health Factor (Definition L)" +L_WORKFORCE = "Workforce Factor (Definition L)" +L_NON_WORKFORCE = "Any Non-Workforce Factor (Definition L)" + +# Poverty / Income +POVERTY_FIELD = "Poverty (Less than 200% of federal poverty line)" +POVERTY_PERCENTILE_FIELD = ( + "Poverty (Less than 200% of federal poverty line) (percentile)" +) +POVERTY_LESS_THAN_200_FPL_FIELD = ( + "Percent of individuals < 200% Federal Poverty Line" +) +POVERTY_LESS_THAN_200_FPL_PERCENTILE_FIELD = ( + "Percent of individuals < 200% Federal Poverty Line (percentile)" +) +POVERTY_LESS_THAN_150_FPL_FIELD = ( + "Percent of individuals < 150% Federal Poverty Line" +) +POVERTY_LESS_THAN_150_FPL_PERCENTILE_FIELD = ( + "Percent of individuals < 150% Federal Poverty Line (percentile)" +) +POVERTY_LESS_THAN_100_FPL_FIELD = ( + "Percent of individuals < 100% Federal Poverty Line" +) +POVERTY_LESS_THAN_100_FPL_PERCENTILE_FIELD = ( + "Percent of individuals < 100% Federal Poverty Line (percentile)" +) +MEDIAN_INCOME_PERCENT_AMI_FIELD = "Median household income (% of AMI)" +MEDIAN_INCOME_PERCENT_AMI_PERCENTILE_FIELD = "Median household income (% of AMI) (percentile)" +STATE_MEDIAN_INCOME_FIELD = ( + "Median household income (State; 2019 inflation-adjusted dollars)" +) +MEDIAN_INCOME_FIELD = "Median household income in the past 12 months" +MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD = ( + "Median household income (% of state median household income)" +) +MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD = "Median household income (% of AMI)" +PERSISTENT_POVERTY_FIELD = "Persistent Poverty Census Tract" +AMI_FIELD = "Area Median Income (State or metropolitan)" + +# Climate +FEMA_RISK_FIELD = "FEMA Risk Index Expected Annual Loss Score" +FEMA_RISK_PERCENTILE_FIELD = ( + "FEMA Risk Index Expected Annual Loss Score (percentile)" +) + +# Environment +DIESEL_FIELD = "Diesel particulate matter" +DIESEL_PERCENTILE_FIELD = "Diesel particulate matter (percentile)" +PM25_FIELD = "Particulate matter (PM2.5)" +PM25_PERCENTILE_FIELD = "Particulate matter (PM2.5) (percentile)" +OZONE_FIELD = "Ozone" +TRAFFIC_FIELD = "Traffic proximity and volume" +TRAFFIC_PERCENTILE_FIELD = "Traffic proximity and volume (percentile)" +LEAD_PAINT_FIELD = "Percent pre-1960s housing (lead paint indicator)" +LEAD_PAINT_PERCENTILE_FIELD = ( + "Percent pre-1960s housing (lead paint indicator) (percentile)" +) +WASTEWATER_FIELD = "Wastewater discharge" +WASTEWATER_PERCENTILE_FIELD = "Wastewater discharge (percentile)" +AGGREGATION_POLLUTION_FIELD = "Pollution Burden" +RMP_FIELD = "Proximity to RMP sites (percentile)" +RMP_PERCENTILE_FIELD = "Proximity to RMP sites (percentile)" +TSDF_FIELD = "Proximity to TSDF sites" +NPL_FIELD = "Proximity to NPL sites" +AIR_TOXICS_CANCER_RISK_FIELD = "Air toxics cancer risk" + +# Housing +HOUSING_BURDEN_FIELD = "Housing burden (percent)" +HOUSING_BURDEN_PERCENTILE_FIELD = "Housing burden (percent) (percentile)" +HT_INDEX_FIELD = ( + "Housing + Transportation Costs % Income for the Regional Typical Household" +) + +# Energy +ENERGY_BURDEN_FIELD = "Energy burden" +ENERGY_BURDEN_PERCENTILE_FIELD = "Energy burden (percentile)" + +# Health +DIABETES_FIELD = "Diagnosed diabetes among adults aged >=18 years" +DIABETES_PERCENTILE_FIELD = ( + "Diagnosed diabetes among adults aged >=18 years (percentile)" +) +ASTHMA_FIELD = "Current asthma among adults aged >=18 years" +ASTHMA_PERCENTILE_FIELD = ( + "Current asthma among adults aged >=18 years (percentile)" +) +HEART_DISEASE_FIELD = "Coronary heart disease among adults aged >=18 years" +HEART_DISEASE_PERCENTILE_FIELD = ( + "Coronary heart disease among adults aged >=18 years (percentile)" +) +LIFE_EXPECTANCY_FIELD = "Life expectancy (years)" +LIFE_EXPECTANCY_PERCENTILE_FIELD = "Life expectancy (years) (percentile)" +RESPITORY_HAZARD_FIELD = "Respiratory hazard index" +RESPITORY_HAZARD_PERCENTILE_FIELD = "Respiratory hazard index (percentile)" +CANCER_FIELD = "Cancer (excluding skin cancer) among adults aged >=18 years" +CANCER_PERCENTILE_FIELD = ( + "Cancer (excluding skin cancer) among adults aged >=18 years (percentile)" +) +HEALTH_INSURANCE_FIELD = ( + "Current lack of health insurance among adults aged 18-64 years" +) +PHYS_HEALTH_NOT_GOOD_FIELD = ( + "Physical health not good for >=14 days among adults aged >=18 years" +) + +# Other Demographics +TOTAL_POP_FIELD = "Total population" +UNEMPLOYMENT_FIELD = "Unemployed civilians (percent)" +UNEMPLOYMENT_PERCENTILE_FIELD = "Unemployed civilians (percent) (percentile)" +LINGUISTIC_ISO_FIELD = "Linguistic isolation (percent)" +LINGUISTIC_ISO_PERCENTILE_FIELD = "Linguistic isolation (percent) (percentile)" +HOUSEHOLDS_LINGUISTIC_ISO_FIELD = ( + "Percent of households in linguistic isolation" +) +HIGH_SCHOOL_ED_FIELD = ( + "Percent individuals age 25 or over with less than high school degree" +) +HIGH_SCHOOL_ED_PERCENTILE_FIELD = "Percent individuals age 25 or over with less than high school degree (percentile)" +AGGREGATION_POPULATION_FIELD = "Population Characteristics" +UNDER_5_FIELD = "Individuals under 5 years old" +OVER_64_FIELD = "Individuals over 64 years old" + +# Urban Rural Map +URBAN_HERUISTIC_FIELD = "Urban Heuristic Flag" diff --git a/data/data-pipeline/data_pipeline/score/score.py b/data/data-pipeline/data_pipeline/score/score.py new file mode 100644 index 00000000..2871b0e4 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score.py @@ -0,0 +1,9 @@ +import pandas as pd + + +class Score: + def __init__(self, df: pd.DataFrame) -> None: + self.df = df + + def add_columns(self) -> pd.DataFrame: + raise NotImplementedError diff --git a/data/data-pipeline/data_pipeline/score/score_a.py b/data/data-pipeline/data_pipeline/score/score_a.py new file mode 100644 index 00000000..3cb64bc1 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_a.py @@ -0,0 +1,19 @@ +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreA(Score): + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score A") + self.df[field_names.SCORE_A] = self.df[ + [ + field_names.POVERTY_PERCENTILE_FIELD, + field_names.HIGH_SCHOOL_ED_PERCENTILE_FIELD, + ] + ].mean(axis=1) + return self.df diff --git a/data/data-pipeline/data_pipeline/score/score_b.py b/data/data-pipeline/data_pipeline/score/score_b.py new file mode 100644 index 00000000..74f28311 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_b.py @@ -0,0 +1,17 @@ +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreB(Score): + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score B") + self.df[field_names.SCORE_B] = ( + self.df[field_names.POVERTY_PERCENTILE_FIELD] + * self.df[field_names.HIGH_SCHOOL_ED_PERCENTILE_FIELD] + ) + return self.df diff --git a/data/data-pipeline/data_pipeline/score/score_c.py b/data/data-pipeline/data_pipeline/score/score_c.py new file mode 100644 index 00000000..121718c8 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_c.py @@ -0,0 +1,99 @@ +from collections import namedtuple +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreC(Score): + def __init__(self, df: pd.DataFrame) -> None: + Bucket = namedtuple('Bucket', ['name', 'fields']) + + self.BUCKET_SOCIOECONOMIC = Bucket( + field_names.C_SOCIOECONOMIC, + [ + field_names.HOUSEHOLDS_LINGUISTIC_ISO_FIELD, + field_names.POVERTY_FIELD, + field_names.HIGH_SCHOOL_ED_FIELD, + field_names.UNEMPLOYMENT_FIELD, + field_names.HT_INDEX_FIELD, + ] + ) + self.BUCKET_SENSITIVE = Bucket( + field_names.C_SENSITIVE, + [ + field_names.UNDER_5_FIELD, + field_names.OVER_64_FIELD, + field_names.LINGUISTIC_ISO_FIELD, + ] + ) + self.BUCKET_ENVIRONMENTAL = Bucket( + field_names.C_ENVIRONMENTAL, + [ + field_names.RMP_FIELD, + field_names.TSDF_FIELD, + field_names.NPL_FIELD, + field_names.WASTEWATER_FIELD, + field_names.LEAD_PAINT_FIELD, + ] + ) + self.BUCKET_EXPOSURES = Bucket( + field_names.C_EXPOSURES, + [ + field_names.AIR_TOXICS_CANCER_RISK_FIELD, + field_names.RESPITORY_HAZARD_FIELD, + field_names.DIESEL_FIELD, + field_names.PM25_FIELD, + field_names.OZONE_FIELD, + field_names.TRAFFIC_FIELD, + ], + ) + self.BUCKETS = [ + self.BUCKET_SOCIOECONOMIC, + self.BUCKET_SENSITIVE, + self.BUCKET_ENVIRONMENTAL, + self.BUCKET_EXPOSURES, + ] + super().__init__(df) + + # "CalEnviroScreen for the US" score + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score C") + # Average all the percentile values in each bucket into a single score for each of the four buckets. + + # TODO just use the percentile fields in the list instead + for bucket in self.BUCKETS: + fields_to_average = [] + for field in bucket.fields: + fields_to_average.append( + f"{field}{field_names.PERCENTILE_FIELD_SUFFIX}" + ) + + self.df[f"{bucket.name}"] = self.df[fields_to_average].mean(axis=1) + + # Combine the score from the two Exposures and Environmental Effects buckets + # into a single score called "Pollution Burden". + # The math for this score is: + # (1.0 * Exposures Score + 0.5 * Environment Effects score) / 1.5. + self.df[field_names.AGGREGATION_POLLUTION_FIELD] = ( + 1.0 * self.df[self.BUCKET_EXPOSURES.name] + + 0.5 * self.df[self.BUCKET_ENVIRONMENTAL.name] + ) / 1.5 + + # Average the score from the two Sensitive populations and + # Socioeconomic factors buckets into a single score called + # "Population Characteristics". + self.df[field_names.AGGREGATION_POPULATION_FIELD] = self.df[ + [self.BUCKET_SENSITIVE.name, self.BUCKET_SOCIOECONOMIC.name] + ].mean(axis=1) + + # Multiply the "Pollution Burden" score and the "Population Characteristics" + # together to produce the cumulative impact score. + self.df[field_names.SCORE_C] = ( + self.df[field_names.AGGREGATION_POLLUTION_FIELD] + * self.df[field_names.AGGREGATION_POPULATION_FIELD] + ) + return self.df diff --git a/data/data-pipeline/data_pipeline/score/score_d.py b/data/data-pipeline/data_pipeline/score/score_d.py new file mode 100644 index 00000000..22d7b4aa --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_d.py @@ -0,0 +1,35 @@ +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreD(Score): + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Scores D and E") + fields_to_use_in_score = [ + field_names.UNEMPLOYMENT_FIELD, + field_names.LINGUISTIC_ISO_FIELD, + field_names.HOUSING_BURDEN_FIELD, + field_names.POVERTY_FIELD, + field_names.HIGH_SCHOOL_ED_FIELD, + ] + + fields_min_max = [ + f"{field}{field_names.MIN_MAX_FIELD_SUFFIX}" + for field in fields_to_use_in_score + ] + fields_percentile = [ + f"{field}{field_names.PERCENTILE_FIELD_SUFFIX}" + for field in fields_to_use_in_score + ] + + # Calculate "Score D", which uses min-max normalization + # and calculate "Score E", which uses percentile normalization for the same fields + self.df[field_names.SCORE_D] = self.df[fields_min_max].mean(axis=1) + self.df[field_names.SCORE_E] = self.df[fields_percentile].mean(axis=1) + + return self.df diff --git a/data/data-pipeline/data_pipeline/score/score_f.py b/data/data-pipeline/data_pipeline/score/score_f.py new file mode 100644 index 00000000..44b42d68 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_f.py @@ -0,0 +1,46 @@ +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreF(Score): + # TODO Make variables and constants clearer (meaning and type) + + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score F") + ami_and_high_school_field = "Low AMI, Low HS graduation" + meets_socio_field = "Meets socioeconomic criteria" + meets_burden_field = "Meets burden criteria" + + self.df[ami_and_high_school_field] = ( + self.df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD] < 0.80 + ) & (self.df[field_names.HIGH_SCHOOL_ED_FIELD] > 0.2) + + self.df[meets_socio_field] = ( + self.df[ami_and_high_school_field] + | (self.df[field_names.POVERTY_FIELD] > 0.40) + | (self.df[field_names.LINGUISTIC_ISO_FIELD] > 0.10) + | (self.df[field_names.HIGH_SCHOOL_ED_FIELD] > 0.4) + ) + + self.df[meets_burden_field] = ( + (self.df[field_names.PM25_PERCENTILE_FIELD] > 0.9) + | (self.df[field_names.RESPITORY_HAZARD_PERCENTILE_FIELD] > 0.9) + | (self.df[field_names.TRAFFIC_PERCENTILE_FIELD] > 0.9) + | (self.df[field_names.LEAD_PAINT_PERCENTILE_FIELD] > 0.9) + | (self.df[field_names.RMP_PERCENTILE_FIELD] > 0.9) + | (self.df[field_names.ASTHMA_PERCENTILE_FIELD] > 0.9) + | (self.df[field_names.HEART_DISEASE_PERCENTILE_FIELD] > 0.9) + | (self.df[field_names.CANCER_PERCENTILE_FIELD] > 0.9) + | (self.df[field_names.DIABETES_PERCENTILE_FIELD] > 0.9) + ) + + self.df[field_names.SCORE_F_COMMUNITIES] = ( + self.df[meets_socio_field] & self.df[meets_burden_field] + ) + + return self.df diff --git a/data/data-pipeline/data_pipeline/score/score_g.py b/data/data-pipeline/data_pipeline/score/score_g.py new file mode 100644 index 00000000..f83b8fd4 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_g.py @@ -0,0 +1,35 @@ +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreG(Score): + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score G") + + high_school_cutoff_threshold = 0.05 + + # Score G is now modified NMTC + self.df[field_names.SCORE_G_COMMUNITIES] = ( + (self.df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD] < 0.8) + & ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] + > high_school_cutoff_threshold + ) + ) | ( + (self.df[field_names.POVERTY_LESS_THAN_100_FPL_FIELD] > 0.20) + & ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] + > high_school_cutoff_threshold + ) + ) + self.df[field_names.SCORE_G] = self.df[ + field_names.SCORE_G_COMMUNITIES + ].astype(int) + self.df["Score G (percentile)"] = self.df[field_names.SCORE_G] + + return self.df diff --git a/data/data-pipeline/data_pipeline/score/score_h.py b/data/data-pipeline/data_pipeline/score/score_h.py new file mode 100644 index 00000000..67c58d22 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_h.py @@ -0,0 +1,33 @@ +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreH(Score): + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score H") + + high_school_cutoff_threshold = 0.06 + + self.df[field_names.SCORE_H_COMMUNITIES] = ( + (self.df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD] < 0.8) + & ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] + > high_school_cutoff_threshold + ) + ) | ( + (self.df[field_names.POVERTY_LESS_THAN_200_FPL_FIELD] > 0.40) + & ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] + > high_school_cutoff_threshold + ) + ) + self.df[field_names.SCORE_H] = self.df[ + field_names.SCORE_H_COMMUNITIES + ].astype(int) + + return self.df diff --git a/data/data-pipeline/data_pipeline/score/score_i.py b/data/data-pipeline/data_pipeline/score/score_i.py new file mode 100644 index 00000000..948669d8 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_i.py @@ -0,0 +1,34 @@ +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreI(Score): + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score I") + + high_school_cutoff_threshold = 0.05 + + self.df[field_names.SCORE_I_COMMUNITIES] = ( + (self.df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD] < 0.7) + & ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] + > high_school_cutoff_threshold + ) + ) | ( + (self.df[field_names.POVERTY_LESS_THAN_200_FPL_FIELD] > 0.50) + & ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] + > high_school_cutoff_threshold + ) + ) + self.df[field_names.SCORE_I] = self.df[ + field_names.SCORE_I_COMMUNITIES + ].astype(int) + self.df["Score I (percentile)"] = self.df[field_names.SCORE_I] + + return self.df diff --git a/data/data-pipeline/data_pipeline/score/score_k.py b/data/data-pipeline/data_pipeline/score/score_k.py new file mode 100644 index 00000000..44452f17 --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_k.py @@ -0,0 +1,34 @@ +import pandas as pd + +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreK(Score): + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score K") + + high_school_cutoff_threshold = 0.06 + + self.df[field_names.SCORE_K] = ( + (self.df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD] < 0.8) + ) | (self.df[field_names.POVERTY_LESS_THAN_100_FPL_FIELD] > 0.20) + + self.df[field_names.SCORE_K_COMMUNITIES] = ( + (self.df[field_names.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD] < 0.8) + & ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] + > high_school_cutoff_threshold + ) + ) | ( + (self.df[field_names.POVERTY_LESS_THAN_100_FPL_FIELD] > 0.20) + & ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] + > high_school_cutoff_threshold + ) + ) + + return self.df diff --git a/data/data-pipeline/data_pipeline/etl/score/score_calculator.py b/data/data-pipeline/data_pipeline/score/score_l.py similarity index 51% rename from data/data-pipeline/data_pipeline/etl/score/score_calculator.py rename to data/data-pipeline/data_pipeline/score/score_l.py index 9b8da89e..e2c53eac 100644 --- a/data/data-pipeline/data_pipeline/etl/score/score_calculator.py +++ b/data/data-pipeline/data_pipeline/score/score_l.py @@ -1,159 +1,87 @@ import pandas as pd +from data_pipeline.score.score import Score +import data_pipeline.score.field_names as field_names from data_pipeline.utils import get_module_logger logger = get_module_logger(__name__) -class ScoreCalculator: - def __init__(self, df: pd.DataFrame): - # Define some global parameters - self.df = df - self.POVERTY_LESS_THAN_200_FPL_FIELD: str = ( - "Percent of individuals < 200% Federal Poverty Line (percentile)" - ) - - self.POVERTY_LESS_THAN_100_FPL_FIELD: str = ( - "Percent of individuals < 100% Federal Poverty Line (percentile)" - ) - - # FEMA Risk Index - self.NATIONAL_RISK_FIELD: str = ( - "FEMA Risk Index Expected Annual Loss Score (percentile)" - ) - - # DOE energy burden - self.ENERGY_BURDEN_FIELD: str = "Energy burden (percentile)" - - # Diesel particulate matter - self.DIESEL_FIELD: str = "Diesel particulate matter (percentile)" - - # PM2.5 - self.PM25_FIELD: str = "Particulate matter (PM2.5) (percentile)" - - # Traffic proximity and volume - self.TRAFFIC_FIELD: str = "Traffic proximity and volume (percentile)" - - # Lead paint - self.LEAD_PAINT_FIELD: str = ( - "Percent pre-1960s housing (lead paint indicator) (percentile)" - ) - - # Housing cost burden - self.HOUSING_BURDEN_FIELD: str = "Housing burden (percent) (percentile)" - - # Wastewater discharge - self.WASTEWATER_FIELD: str = "Wastewater discharge (percentile)" - - # Diabetes - self.DIABETES_FIELD: str = ( - "Diagnosed diabetes among adults aged >=18 years (percentile)" - ) - - # Asthma - self.ASTHMA_FIELD: str = ( - "Current asthma among adults aged >=18 years (percentile)" - ) - - # Heart disease - self.HEART_DISEASE_FIELD: str = ( - "Coronary heart disease among adults aged >=18 years (percentile)" - ) - - # Life expectancy - self.LIFE_EXPECTANCY_FIELD: str = "Life expectancy (years) (percentile)" - - # Unemployment - self.UNEMPLOYMENT_FIELD: str = ( - "Unemployed civilians (percent) (percentile)" - ) - - # Median income as % of AMI - self.MEDIAN_INCOME_FIELD: str = ( - "Median household income (% of AMI) (percentile)" - ) - - # Linguistic isolation - self.LINGUISTIC_ISO_FIELD: str = ( - "Linguistic isolation (percent) (percentile)" - ) - - # Less than high school education - self.HIGH_SCHOOL_ED_FIELD: str = "Percent individuals age 25 or over with less than high school degree (percentile)" - - # Set thresholds for score L +class ScoreL(Score): + def __init__(self, df: pd.DataFrame) -> None: self.LOW_INCOME_THRESHOLD: float = 0.60 self.ENVIRONMENTAL_BURDEN_THRESHOLD: float = 0.90 + super().__init__(df) - def add_definition_l_factors(self): - self.df["Climate Factor (Definition L)"] = self.climate_factor() - self.df["Energy Factor (Definition L)"] = self.energy_factor() - self.df[ - "Transportation Factor (Definition L)" - ] = self.transportation_factor() - self.df["Housing Factor (Definition L)"] = self.housing_factor() - self.df["Pollution Factor (Definition L)"] = self.pollution_factor() - self.df["Water Factor (Definition L)"] = self.water_factor() - self.df["Health Factor (Definition L)"] = self.health_factor() - self.df["Workforce Factor (Definition L)"] = self.workforce_factor() + def add_columns(self) -> pd.DataFrame: + logger.info("Adding Score L") + + self.df[field_names.L_CLIMATE] = self._climate_factor() + self.df[field_names.L_ENERGY] = self._energy_factor() + self.df[field_names.L_TRANSPORTATION] = self._transportation_factor() + self.df[field_names.L_HOUSING] = self._housing_factor() + self.df[field_names.L_POLLUTION] = self._pollution_factor() + self.df[field_names.L_WATER] = self._water_factor() + self.df[field_names.L_HEALTH] = self._health_factor() + self.df[field_names.L_WORKFORCE] = self._workforce_factor() factors = [ - "Climate Factor (Definition L)", - "Energy Factor (Definition L)", - "Transportation Factor (Definition L)", - "Housing Factor (Definition L)", - "Pollution Factor (Definition L)", - "Water Factor (Definition L)", - "Health Factor (Definition L)", - "Workforce Factor (Definition L)", + field_names.L_CLIMATE, + field_names.L_ENERGY, + field_names.L_TRANSPORTATION, + field_names.L_HOUSING, + field_names.L_POLLUTION, + field_names.L_WATER, + field_names.L_HEALTH, + field_names.L_WORKFORCE, ] - self.df["Definition L (communities)"] = self.df[factors].any(axis=1) + self.df[field_names.SCORE_L_COMMUNITIES] = self.df[factors].any(axis=1) # Note: this is purely used for comparison tool analysis, and can be removed at a later date. - LMB. non_workforce_factors = [ - "Climate Factor (Definition L)", - "Energy Factor (Definition L)", - "Transportation Factor (Definition L)", - "Housing Factor (Definition L)", - "Pollution Factor (Definition L)", - "Water Factor (Definition L)", - "Health Factor (Definition L)", + field_names.L_CLIMATE, + field_names.L_ENERGY, + field_names.L_TRANSPORTATION, + field_names.L_HOUSING, + field_names.L_POLLUTION, + field_names.L_WATER, + field_names.L_HEALTH, ] - self.df["Any Non-Workforce Factor (Definition L)"] = self.df[ + self.df[field_names.L_NON_WORKFORCE] = self.df[ non_workforce_factors ].any(axis=1) return self.df - def climate_factor(self) -> bool: + def _climate_factor(self) -> bool: # In Xth percentile or above for FEMA’s Risk Index (Source: FEMA # AND # Low income: In 60th percentile or above for percent of block group population # of households where household income is less than or equal to twice the federal # poverty level. Source: Census's American Community Survey] return ( - self.df[self.POVERTY_LESS_THAN_200_FPL_FIELD] + self.df[field_names.POVERTY_LESS_THAN_200_FPL_PERCENTILE_FIELD] > self.LOW_INCOME_THRESHOLD ) & ( - self.df[self.NATIONAL_RISK_FIELD] + self.df[field_names.FEMA_RISK_PERCENTILE_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) - def energy_factor(self) -> bool: + def _energy_factor(self) -> bool: # In Xth percentile or above for DOE’s energy cost burden score (Source: LEAD Score) # AND # Low income: In 60th percentile or above for percent of block group population # of households where household income is less than or equal to twice the federal # poverty level. Source: Census's American Community Survey] return ( - self.df[self.POVERTY_LESS_THAN_200_FPL_FIELD] + self.df[field_names.POVERTY_LESS_THAN_200_FPL_PERCENTILE_FIELD] > self.LOW_INCOME_THRESHOLD ) & ( - self.df[self.ENERGY_BURDEN_FIELD] + self.df[field_names.ENERGY_BURDEN_PERCENTILE_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) - def transportation_factor(self) -> bool: + def _transportation_factor(self) -> bool: # In Xth percentile or above for diesel particulate matter (Source: EPA National Air Toxics Assessment (NATA) # or # In Xth percentile or above for PM 2.5 (Source: EPA, Office of Air and Radiation (OAR) fusion of model and monitor data)] @@ -164,20 +92,26 @@ class ScoreCalculator: # of households where household income is less than or equal to twice the federal # poverty level. Source: Census's American Community Survey] transportation_criteria = ( - (self.df[self.DIESEL_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD) - | (self.df[self.PM25_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD) + ( + self.df[field_names.DIESEL_PERCENTILE_FIELD] + > self.ENVIRONMENTAL_BURDEN_THRESHOLD + ) | ( - self.df[self.TRAFFIC_FIELD] + self.df[field_names.PM25_PERCENTILE_FIELD] + > self.ENVIRONMENTAL_BURDEN_THRESHOLD + ) + | ( + self.df[field_names.TRAFFIC_PERCENTILE_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) ) return ( - self.df[self.POVERTY_LESS_THAN_200_FPL_FIELD] + self.df[field_names.POVERTY_LESS_THAN_200_FPL_PERCENTILE_FIELD] > self.LOW_INCOME_THRESHOLD ) & transportation_criteria - def housing_factor(self) -> bool: + def _housing_factor(self) -> bool: # In Xth percentile or above for lead paint (Source: Census's American Community Survey’s # percent of housing units built pre-1960, used as an indicator of potential lead paint exposure in homes) # or @@ -187,17 +121,18 @@ class ScoreCalculator: # of households where household income is less than or equal to twice the federal # poverty level. Source: Census's American Community Survey] housing_criteria = ( - self.df[self.LEAD_PAINT_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD + self.df[field_names.LEAD_PAINT_PERCENTILE_FIELD] + > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) | ( - self.df[self.HOUSING_BURDEN_FIELD] + self.df[field_names.HOUSING_BURDEN_PERCENTILE_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) return ( - self.df[self.POVERTY_LESS_THAN_200_FPL_FIELD] + self.df[field_names.POVERTY_LESS_THAN_200_FPL_PERCENTILE_FIELD] > self.LOW_INCOME_THRESHOLD ) & housing_criteria - def pollution_factor(self) -> bool: + def _pollution_factor(self) -> bool: # TBD # AND # Low income: In 60th percentile or above for percent of block group population @@ -205,20 +140,21 @@ class ScoreCalculator: # poverty level. Source: Census's American Community Survey] return False - def water_factor(self) -> bool: + def _water_factor(self) -> bool: # In Xth percentile or above for wastewater discharge (Source: EPA Risk-Screening Environmental Indicators (RSEI) Model) # AND # Low income: In 60th percentile or above for percent of block group population # of households where household income is less than or equal to twice the federal # poverty level. Source: Census's American Community Survey] return ( - self.df[self.POVERTY_LESS_THAN_200_FPL_FIELD] + self.df[field_names.POVERTY_LESS_THAN_200_FPL_PERCENTILE_FIELD] > self.LOW_INCOME_THRESHOLD ) & ( - self.df[self.WASTEWATER_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD + self.df[field_names.WASTEWATER_PERCENTILE_FIELD] + > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) - def health_factor(self) -> bool: + def _health_factor(self) -> bool: # In Xth percentile or above for diabetes (Source: CDC Places) # or # In Xth percentile or above for asthma (Source: CDC Places) @@ -232,25 +168,31 @@ class ScoreCalculator: # poverty level. Source: Census's American Community Survey] health_criteria = ( - (self.df[self.DIABETES_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD) - | (self.df[self.ASTHMA_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD) - | ( - self.df[self.HEART_DISEASE_FIELD] + ( + self.df[field_names.DIABETES_PERCENTILE_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) | ( - self.df[self.LIFE_EXPECTANCY_FIELD] + self.df[field_names.ASTHMA_PERCENTILE_FIELD] + > self.ENVIRONMENTAL_BURDEN_THRESHOLD + ) + | ( + self.df[field_names.HEART_DISEASE_PERCENTILE_FIELD] + > self.ENVIRONMENTAL_BURDEN_THRESHOLD + ) + | ( + self.df[field_names.LIFE_EXPECTANCY_PERCENTILE_FIELD] # Note: a high life expectancy is good, so take 1 minus the threshold to invert it, # and then look for life expenctancies lower than that (not greater than). < 1 - self.ENVIRONMENTAL_BURDEN_THRESHOLD ) ) return ( - self.df[self.POVERTY_LESS_THAN_200_FPL_FIELD] + self.df[field_names.POVERTY_LESS_THAN_200_FPL_PERCENTILE_FIELD] > self.LOW_INCOME_THRESHOLD ) & health_criteria - def workforce_factor(self) -> bool: + def _workforce_factor(self) -> bool: # Where unemployment is above X% # or # Where median income is less than Y% of the area median income @@ -263,22 +205,24 @@ class ScoreCalculator: # (necessary to screen out university block groups) workforce_criteria = ( ( - self.df[self.UNEMPLOYMENT_FIELD] + self.df[field_names.UNEMPLOYMENT_PERCENTILE_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) | ( - self.df[self.MEDIAN_INCOME_FIELD] + self.df[field_names.MEDIAN_INCOME_PERCENT_AMI_PERCENTILE_FIELD] # Note: a high median income as a % of AMI is good, so take 1 minus the threshold to invert it. # and then look for median income lower than that (not greater than). < 1 - self.ENVIRONMENTAL_BURDEN_THRESHOLD ) | ( - self.df[self.POVERTY_LESS_THAN_100_FPL_FIELD] + self.df[field_names.POVERTY_LESS_THAN_100_FPL_PERCENTILE_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) | ( - self.df[self.LINGUISTIC_ISO_FIELD] + self.df[field_names.LINGUISTIC_ISO_PERCENTILE_FIELD] > self.ENVIRONMENTAL_BURDEN_THRESHOLD ) ) - return (self.df[self.HIGH_SCHOOL_ED_FIELD] > 0.05) & workforce_criteria + return ( + self.df[field_names.HIGH_SCHOOL_ED_FIELD] > 0.05 + ) & workforce_criteria diff --git a/data/data-pipeline/data_pipeline/score/score_runner.py b/data/data-pipeline/data_pipeline/score/score_runner.py new file mode 100644 index 00000000..2eac192b --- /dev/null +++ b/data/data-pipeline/data_pipeline/score/score_runner.py @@ -0,0 +1,66 @@ +import pandas as pd +from data_pipeline.score.score_a import ScoreA +from data_pipeline.score.score_b import ScoreB +from data_pipeline.score.score_c import ScoreC +from data_pipeline.score.score_d import ScoreD +from data_pipeline.score.score_f import ScoreF +from data_pipeline.score.score_g import ScoreG +from data_pipeline.score.score_h import ScoreH +from data_pipeline.score.score_i import ScoreI +from data_pipeline.score.score_k import ScoreK +from data_pipeline.score.score_l import ScoreL +from data_pipeline.score import field_names + +from data_pipeline.utils import get_module_logger + +logger = get_module_logger(__name__) + + +class ScoreRunner: + def __init__(self, df: pd.DataFrame): + # Define some global parameters + self.df = df + + def calculate_scores(self) -> pd.DataFrame: + # Index scores + self.df = ScoreA(df=self.df).add_columns() + self.df = ScoreB(df=self.df).add_columns() + self.df = ScoreC(df=self.df).add_columns() + self.df = ScoreD(df=self.df).add_columns() + self.df = ScoreF(df=self.df).add_columns() + self.df = ScoreG(df=self.df).add_columns() + self.df = ScoreH(df=self.df).add_columns() + self.df = ScoreI(df=self.df).add_columns() + self.df = ScoreK(df=self.df).add_columns() + self.df = ScoreL(df=self.df).add_columns() + + # TODO do this with each score instead of in a bundle + # Create percentiles for these index scores + self.df = self._add_score_percentiles() + + return self.df + + def _add_score_percentiles(self) -> pd.DataFrame: + logger.info("Adding Score Percentiles") + for score_field in [ + field_names.SCORE_A, + field_names.SCORE_B, + field_names.SCORE_C, + field_names.SCORE_D, + field_names.SCORE_E, + ]: + self.df[ + f"{score_field}{field_names.PERCENTILE_FIELD_SUFFIX}" + ] = self.df[score_field].rank(pct=True) + + for threshold in [0.25, 0.3, 0.35, 0.4]: + fraction_converted_to_percent = int(100 * threshold) + self.df[ + f"{score_field} (top {fraction_converted_to_percent}th percentile)" + ] = ( + self.df[ + f"{score_field}{field_names.PERCENTILE_FIELD_SUFFIX}" + ] + >= 1 - threshold + ) + return self.df