diff --git a/data/data-pipeline/README.md b/data/data-pipeline/README.md index d4e2914b..dfa2aa0b 100644 --- a/data/data-pipeline/README.md +++ b/data/data-pipeline/README.md @@ -94,7 +94,7 @@ TODO add mermaid diagram 3. Each ETL script will extract the data from its original source, then format the data into `.csv` files that get stored in the relevant folder in `data_pipeline/data/dataset/`. For example, HUD Housing data is stored in `data_pipeline/data/dataset/hud_housing/usa.csv` _**NOTE:** You have the option to pass the name of a specific data source to the `etl-run` command using the `-d` flag, which will limit the execution of the ETL process to that specific data source._ -_For example: `poetry run etl -- -d ejscreen` would only run the ETL process for EJSCREEN data._ +_For example: `poetry run etl -d ejscreen` would only run the ETL process for EJSCREEN data._ #### Step 3: Calculate the Justice40 score experiments diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index 450ce946..c15f045b 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -257,18 +257,8 @@ class ScoreETL(ExtractTransformLoad): low_memory=False, ) - def transform(self) -> None: - ## IMPORTANT: THIS METHOD IS CLOSE TO THE LIMIT OF STATEMENTS - - logger.info("Transforming Score Data") - - # Join all the data sources that use census block groups - census_block_group_dfs = [ - self.ejscreen_df, - self.census_df, - self.housing_and_transportation_df, - ] - + def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame: + logger.info("Joining Census Block Group dataframes") census_block_group_df = functools.reduce( lambda left, right: pd.merge( left=left, right=right, on=self.GEOID_FIELD_NAME, how="outer" @@ -284,12 +274,10 @@ class ScoreETL(ExtractTransformLoad): raise ValueError( f"One of the input CSVs uses {self.GEOID_FIELD_NAME} with a different length." ) - - # Join all the data sources that use census tracts - census_tract_dfs = [ - self.hud_housing_df, - self.cdc_places_df, - ] + return census_block_group_df + + def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame: + logger.info("Joining Census Tract dataframes") census_tract_df = functools.reduce( lambda left, right: pd.merge( left=left, @@ -308,87 +296,21 @@ class ScoreETL(ExtractTransformLoad): raise ValueError( f"One of the input CSVs uses {self.GEOID_TRACT_FIELD_NAME} with a different length." ) + return census_tract_df - # Calculate the tract for the CBG data. - census_block_group_df[ - self.GEOID_TRACT_FIELD_NAME - ] = census_block_group_df[self.GEOID_FIELD_NAME].str[0:11] - - self.df = census_block_group_df.merge( - census_tract_df, on=self.GEOID_TRACT_FIELD_NAME - ) - - if len(census_block_group_df) > 220333: - raise ValueError("Too many rows in the join.") - - # get data sets list - data_sets = self.data_sets() - - # Rename columns: - renaming_dict = { - data_set.input_field: data_set.renamed_field - for data_set in data_sets - } - - self.df.rename( - columns=renaming_dict, - inplace=True, - errors="raise", - ) - - columns_to_keep = [data_set.renamed_field for data_set in data_sets] - self.df = self.df[columns_to_keep] - - # Convert all columns to numeric. - for data_set in data_sets: - # Skip GEOID_FIELD_NAME, because it's a string. - if data_set.renamed_field == self.GEOID_FIELD_NAME: - continue - self.df[f"{data_set.renamed_field}"] = pd.to_numeric( - self.df[data_set.renamed_field] - ) - - # calculate percentiles - for data_set in data_sets: - self.df[ - f"{data_set.renamed_field}{self.PERCENTILE_FIELD_SUFFIX}" - ] = self.df[data_set.renamed_field].rank(pct=True) - - # Math: - # ( - # Observed value - # - minimum of all values - # ) - # divided by - # ( - # Maximum of all values - # - minimum of all values - # ) - for data_set in data_sets: - # Skip GEOID_FIELD_NAME, because it's a string. - if data_set.renamed_field == self.GEOID_FIELD_NAME: - continue - - min_value = self.df[data_set.renamed_field].min(skipna=True) - - max_value = self.df[data_set.renamed_field].max(skipna=True) - - logger.info( - f"For data set {data_set.renamed_field}, the min value is {min_value} and the max value is {max_value}." - ) - - self.df[f"{data_set.renamed_field}{self.MIN_MAX_FIELD_SUFFIX}"] = ( - self.df[data_set.renamed_field] - min_value - ) / (max_value - min_value) - - # Calculate score "A" and score "B" - self.df["Score A"] = self.df[ + def _add_score_a(self, df: pd.DataFrame) -> pd.DataFrame: + logger.info("Adding Score A") + df["Score A"] = df[ [ "Poverty (Less than 200% of federal poverty line) (percentile)", "Percent individuals age 25 or over with less than high school degree (percentile)", ] ].mean(axis=1) - self.df["Score B"] = ( + return df + + def _add_score_b(self, df: pd.DataFrame) -> pd.DataFrame: + logger.info("Adding Score B") + df["Score B"] = ( self.df[ "Poverty (Less than 200% of federal poverty line) (percentile)" ] @@ -396,8 +318,10 @@ class ScoreETL(ExtractTransformLoad): "Percent individuals age 25 or over with less than high school degree (percentile)" ] ) + return df - # Calculate "CalEnviroScreen for the US" score + def _add_score_c(self, df: pd.DataFrame, data_sets: list) -> pd.DataFrame: + logger.info("Adding Score C") # Average all the percentile values in each bucket into a single score for each of the four buckets. for bucket in self.BUCKETS: fields_in_bucket = [ @@ -405,34 +329,34 @@ class ScoreETL(ExtractTransformLoad): for data_set in data_sets if data_set.bucket == bucket ] - self.df[f"{bucket}"] = self.df[fields_in_bucket].mean(axis=1) + df[f"{bucket}"] = df[fields_in_bucket].mean(axis=1) # Combine the score from the two Exposures and Environmental Effects buckets # into a single score called "Pollution Burden". # The math for this score is: # (1.0 * Exposures Score + 0.5 * Environment Effects score) / 1.5. - self.df[self.AGGREGATION_POLLUTION] = ( - 1.0 * self.df[f"{self.BUCKET_EXPOSURES}"] - + 0.5 * self.df[f"{self.BUCKET_ENVIRONMENTAL}"] + df[self.AGGREGATION_POLLUTION] = ( + 1.0 * df[f"{self.BUCKET_EXPOSURES}"] + + 0.5 * df[f"{self.BUCKET_ENVIRONMENTAL}"] ) / 1.5 # Average the score from the two Sensitive populations and # Socioeconomic factors buckets into a single score called # "Population Characteristics". - self.df[self.AGGREGATION_POPULATION] = self.df[ + df[self.AGGREGATION_POPULATION] = df[ [f"{self.BUCKET_SENSITIVE}", f"{self.BUCKET_SOCIOECONOMIC}"] ].mean(axis=1) # Multiply the "Pollution Burden" score and the "Population Characteristics" # together to produce the cumulative impact score. - self.df["Score C"] = ( - self.df[self.AGGREGATION_POLLUTION] - * self.df[self.AGGREGATION_POPULATION] + df["Score C"] = ( + df[self.AGGREGATION_POLLUTION] + * df[self.AGGREGATION_POPULATION] ) - - if len(census_block_group_df) > 220333: - raise ValueError("Too many rows in the join.") - + return df + + def _add_scores_d_and_e(self, df: pd.DataFrame) -> pd.DataFrame: + logger.info("Adding Scores D and E") fields_to_use_in_score = [ self.UNEMPLOYED_FIELD_NAME, self.LINGUISTIC_ISOLATION_FIELD_NAME, @@ -452,10 +376,12 @@ class ScoreETL(ExtractTransformLoad): # Calculate "Score D", which uses min-max normalization # and calculate "Score E", which uses percentile normalization for the same fields - self.df["Score D"] = self.df[fields_min_max].mean(axis=1) - self.df["Score E"] = self.df[fields_percentile].mean(axis=1) + df["Score D"] = self.df[fields_min_max].mean(axis=1) + df["Score E"] = self.df[fields_percentile].mean(axis=1) + return df - # Create percentiles for the scores + def _add_score_percentiles(self, df: pd.DataFrame) -> pd.DataFrame: + logger.info("Adding Score Percentiles") for score_field in [ "Score A", "Score B", @@ -464,62 +390,63 @@ class ScoreETL(ExtractTransformLoad): "Score E", "Poverty (Less than 200% of federal poverty line)", ]: - self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = self.df[ + df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = df[ score_field ].rank(pct=True) for threshold in [0.25, 0.3, 0.35, 0.4]: fraction_converted_to_percent = int(100 * threshold) - self.df[ + df[ f"{score_field} (top {fraction_converted_to_percent}th percentile)" ] = ( - self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] + df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] >= 1 - threshold ) + return df - # Now for binary (non index) scores. - - # Calculate "Score F", which uses "either/or" thresholds. + # TODO Make variables and constants clearer (meaning and type) + def _add_score_f(self, df: pd.DataFrame) -> pd.DataFrame: + logger.info("Adding Score F") ami_and_high_school_field_name = "Low AMI, Low HS graduation" meets_socio_field_name = "Meets socioeconomic criteria" meets_burden_field_name = "Meets burden criteria" - self.df[ami_and_high_school_field_name] = ( - self.df[self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME] < 0.80 - ) & (self.df[self.HIGH_SCHOOL_FIELD_NAME] > 0.2) + df[ami_and_high_school_field_name] = ( + df[self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME] < 0.80 + ) & (df[self.HIGH_SCHOOL_FIELD_NAME] > 0.2) - self.df[meets_socio_field_name] = ( - self.df[ami_and_high_school_field_name] - | (self.df[self.POVERTY_FIELD_NAME] > 0.40) - | (self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME] > 0.10) - | (self.df[self.HIGH_SCHOOL_FIELD_NAME] > 0.4) + df[meets_socio_field_name] = ( + df[ami_and_high_school_field_name] + | (df[self.POVERTY_FIELD_NAME] > 0.40) + | (df[self.LINGUISTIC_ISOLATION_FIELD_NAME] > 0.10) + | (df[self.HIGH_SCHOOL_FIELD_NAME] > 0.4) ) - self.df[meets_burden_field_name] = ( - (self.df["Particulate matter (PM2.5) (percentile)"] > 0.9) - | (self.df["Respiratory hazard index (percentile)"] > 0.9) - | (self.df["Traffic proximity and volume (percentile)"] > 0.9) + df[meets_burden_field_name] = ( + (df["Particulate matter (PM2.5) (percentile)"] > 0.9) + | (df["Respiratory hazard index (percentile)"] > 0.9) + | (df["Traffic proximity and volume (percentile)"] > 0.9) | ( - self.df[ + df[ "Percent pre-1960s housing (lead paint indicator) (percentile)" ] > 0.9 ) - | (self.df["Proximity to RMP sites (percentile)"] > 0.9) + | (df["Proximity to RMP sites (percentile)"] > 0.9) | ( - self.df[ + df[ "Current asthma among adults aged >=18 years (percentile)" ] > 0.9 ) | ( - self.df[ + df[ "Coronary heart disease among adults aged >=18 years (percentile)" ] > 0.9 ) | ( - self.df[ + df[ "Cancer (excluding skin cancer) among adults aged >=18 years (percentile)" ] > 0.9 @@ -531,7 +458,7 @@ class ScoreETL(ExtractTransformLoad): # > 0.9 # ) | ( - self.df[ + df[ "Diagnosed diabetes among adults aged >=18 years (percentile)" ] > 0.9 @@ -544,9 +471,134 @@ class ScoreETL(ExtractTransformLoad): # ) ) - self.df["Score F (communities)"] = ( - self.df[meets_socio_field_name] & self.df[meets_burden_field_name] + df["Score F (communities)"] = ( + df[meets_socio_field_name] & df[meets_burden_field_name] ) + return df + + # TODO Move a lot of this to the ETL part of the pipeline + def _prepare_initial_df(self, data_sets: list) -> pd.DataFrame: + logger.info("Preparing initial dataframe") + + # Join all the data sources that use census block groups + census_block_group_dfs = [ + self.ejscreen_df, + self.census_df, + self.housing_and_transportation_df, + ] + census_block_group_df = self._join_cbg_dfs(census_block_group_dfs) + + # Join all the data sources that use census tracts + census_tract_dfs = [ + self.hud_housing_df, + self.cdc_places_df, + ] + census_tract_df = self._join_tract_dfs(census_tract_dfs) + + # Calculate the tract for the CBG data. + census_block_group_df[ + self.GEOID_TRACT_FIELD_NAME + ] = census_block_group_df[self.GEOID_FIELD_NAME].str[0:11] + + df = census_block_group_df.merge( + census_tract_df, on=self.GEOID_TRACT_FIELD_NAME + ) + + # If GEOID10s are read as numbers instead of strings, the initial 0 is dropped, + # and then we get too many CBG rows (one for 012345 and one for 12345). + if len(census_block_group_df) > 220333: + raise ValueError("Too many rows in the join.") + + # TODO Refactor to no longer use the data_sets list and do all renaming in ETL step + # Rename columns: + renaming_dict = { + data_set.input_field: data_set.renamed_field + for data_set in data_sets + } + + df.rename( + columns=renaming_dict, + inplace=True, + errors="raise", + ) + + columns_to_keep = [data_set.renamed_field for data_set in data_sets] + df = df[columns_to_keep] + + # Convert all columns to numeric. + # TODO do this at the same time as calculating percentiles in future refactor + for data_set in data_sets: + # Skip GEOID_FIELD_NAME, because it's a string. + if data_set.renamed_field == self.GEOID_FIELD_NAME: + continue + df[f"{data_set.renamed_field}"] = pd.to_numeric( + df[data_set.renamed_field] + ) + + # calculate percentiles + for data_set in data_sets: + df[ + f"{data_set.renamed_field}{self.PERCENTILE_FIELD_SUFFIX}" + ] = df[data_set.renamed_field].rank(pct=True) + + # Do some math: + # ( + # Observed value + # - minimum of all values + # ) + # divided by + # ( + # Maximum of all values + # - minimum of all values + # ) + for data_set in data_sets: + # Skip GEOID_FIELD_NAME, because it's a string. + if data_set.renamed_field == self.GEOID_FIELD_NAME: + continue + + min_value = df[data_set.renamed_field].min(skipna=True) + + max_value = df[data_set.renamed_field].max(skipna=True) + + logger.info( + f"For data set {data_set.renamed_field}, the min value is {min_value} and the max value is {max_value}." + ) + + df[f"{data_set.renamed_field}{self.MIN_MAX_FIELD_SUFFIX}"] = ( + df[data_set.renamed_field] - min_value + ) / (max_value - min_value) + + return df + + def transform(self) -> None: + ## IMPORTANT: THIS METHOD IS CLOSE TO THE LIMIT OF STATEMENTS + + logger.info("Transforming Score Data") + + # get data sets list + data_sets = self.data_sets() + + # prepare the df with the right CBG/tract IDs, column names/types, and percentiles + self.df = self._prepare_initial_df(data_sets) + + # Calculate score "A" + self.df = self._add_score_a(self.df) + + # Calculate score "B" + self.df = self._add_score_b(self.df) + + # Calculate score "C" - "CalEnviroScreen for the US" score + self.df = self._add_score_c(self.df, data_sets) + + # Calculate scores "D" and "E" + self.df = self._add_scores_d_and_e(self.df) + + # Create percentiles for the scores + self.df = self._add_score_percentiles(self.df) + + # Now for binary (non index) scores. + # Calculate "Score F", which uses "either/or" thresholds. + self.df = self._add_score_f(self.df) def load(self) -> None: logger.info("Saving Score CSV") diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl.py index c8dfc93c..26b6f834 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl.py @@ -21,11 +21,11 @@ class CensusACSETL(ExtractTransformLoad): "Linguistic isolation (total)" ) self.LINGUISTIC_ISOLATION_FIELDS = [ - "C16002_001E", - "C16002_004E", - "C16002_007E", - "C16002_010E", - "C16002_013E", + "C16002_001E", # Estimate!!Total + "C16002_004E", # Estimate!!Total!!Spanish!!Limited English speaking household + "C16002_007E", # Estimate!!Total!!Other Indo-European languages!!Limited English speaking household + "C16002_010E", # Estimate!!Total!!Asian and Pacific Island languages!!Limited English speaking household + "C16002_013E", # Estimate!!Total!!Other languages!!Limited English speaking household ] self.MEDIAN_INCOME_FIELD = "B19013_001E" self.MEDIAN_INCOME_FIELD_NAME = ( diff --git a/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py b/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py index 0dbfaee0..4411261c 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py @@ -272,6 +272,12 @@ class HudHousingETL(ExtractTransformLoad): - self.df[RENTER_OCCUPIED_NOT_COMPUTED_FIELDS].sum(axis=1) ) + + self.df["DENOM INCL NOT COMPUTED"] = ( + self.df[OWNER_OCCUPIED_POPULATION_FIELD] + + self.df[RENTER_OCCUPIED_POPULATION_FIELD] + ) + # TODO: add small sample size checks self.df[self.HOUSING_BURDEN_FIELD_NAME] = self.df[ self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME @@ -293,5 +299,6 @@ class HudHousingETL(ExtractTransformLoad): self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME, self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME, self.HOUSING_BURDEN_FIELD_NAME, + "DENOM INCL NOT COMPUTED", ] ].to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)