In [None]:
# Before running this notebook, you must run the following notebooks (in any order):
# 1. `ejscreen_etl.ipynb`
# 2. `census_etl.ipynb`
# 3. `housing_and_transportation_etl.ipynb`
# 4. `hud_housing_etl.ipynb`

import collections
import functools
from pathlib import Path
import pandas as pd
import csv
import os
import sys

module_path = os.path.abspath(os.path.join(".."))
if module_path not in sys.path:
 sys.path.append(module_path)

from etl.sources.census.etl_utils import get_state_fips_codes

# Define some global parameters
GEOID_FIELD_NAME = "GEOID10"
GEOID_TRACT_FIELD_NAME = "GEOID10_TRACT"
BUCKET_SOCIOECONOMIC = "Socioeconomic Factors"
BUCKET_SENSITIVE = "Sensitive populations"
BUCKET_ENVIRONMENTAL = "Environmental effects"
BUCKET_EXPOSURES = "Exposures"
BUCKETS = [
 BUCKET_SOCIOECONOMIC,
 BUCKET_SENSITIVE,
 BUCKET_ENVIRONMENTAL,
 BUCKET_EXPOSURES,
]

# A few specific field names
# TODO: clean this up, I name some fields but not others.
UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
HOUSING_BURDEN_FIELD_NAME = "Housing burden (percent)"
POVERTY_FIELD_NAME = "Poverty (Less than 200% of federal poverty line)"
HIGH_SCHOOL_FIELD_NAME = (
 "Percent individuals age 25 or over with less than high school degree"
)

# There's another aggregation level (a second level of "buckets").
AGGREGATION_POLLUTION = "Pollution Burden"
AGGREGATION_POPULATION = "Population Characteristics"

PERCENTILE_FIELD_SUFFIX = " (percentile)"
MIN_MAX_FIELD_SUFFIX = " (min-max normalized)"

DATA_PATH = Path.cwd().parent / "data"
SCORE_CSV_PATH = DATA_PATH / "score" / "csv"

# Tell pandas to display all columns
pd.set_option("display.max_columns", None)

In [None]:
# EJSCreen csv Load
ejscreen_csv = DATA_PATH / "dataset" / "ejscreen_2020" / "usa.csv"
ejscreen_df = pd.read_csv(ejscreen_csv, dtype={"ID": "string"}, low_memory=False)
ejscreen_df.rename(columns={"ID": GEOID_FIELD_NAME}, inplace=True)
ejscreen_df.head()

In [None]:
# Load census data
census_csv = DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
census_df = pd.read_csv(
 census_csv, dtype={GEOID_FIELD_NAME: "string"}, low_memory=False
)
census_df.head()

In [None]:
# Load housing and transportation data
housing_and_transportation_index_csv = (
 DATA_PATH / "dataset" / "housing_and_transportation_index" / "usa.csv"
)
housing_and_transportation_df = pd.read_csv(
 housing_and_transportation_index_csv,
 dtype={GEOID_FIELD_NAME: "string"},
 low_memory=False,
)
housing_and_transportation_df.head()

In [None]:
# Load HUD housing data
hud_housing_csv = DATA_PATH / "dataset" / "hud_housing" / "usa.csv"
hud_housing_df = pd.read_csv(
 hud_housing_csv,
 dtype={GEOID_TRACT_FIELD_NAME: "string"},
 low_memory=False,
)
hud_housing_df.head()

In [None]:
# Join all the data sources that use census block groups
census_block_group_dfs = [ejscreen_df, census_df, housing_and_transportation_df]

census_block_group_df = functools.reduce(
 lambda left, right: pd.merge(
 left=left, right=right, on=GEOID_FIELD_NAME, how="outer"
 ),
 census_block_group_dfs,
)


if len(census_block_group_df) > 220333:
 raise ValueError("Too many rows in the join.")

census_block_group_df.head()

In [None]:
# Sanity check the join.
if len(census_block_group_df[GEOID_FIELD_NAME].str.len().unique()) != 1:
 raise ValueError(
 f"One of the input CSVs uses {GEOID_FIELD_NAME} with a different length."
 )

In [None]:
# Join all the data sources that use census tracts
# TODO: when there's more than one data source using census tract, reduce/merge them here.
census_tract_df = hud_housing_df

# Calculate the tract for the CBG data.
census_block_group_df[GEOID_TRACT_FIELD_NAME] = census_block_group_df[
 GEOID_FIELD_NAME
].str[0:11]

df = census_block_group_df.merge(census_tract_df, on=GEOID_TRACT_FIELD_NAME)

if len(census_block_group_df) > 220333:
 raise ValueError("Too many rows in the join.")

df.head()

In [None]:
# Define a named tuple that will be used for each data set input.
DataSet = collections.namedtuple(
 typename="DataSet", field_names=["input_field", "renamed_field", "bucket"]
)

data_sets = [
 # The following data sets have `bucket=None`, because it's not used in the bucket based score ("Score C").
 DataSet(
 input_field=GEOID_FIELD_NAME,
 # Use the name `GEOID10` to enable geoplatform.gov's workflow.
 renamed_field=GEOID_FIELD_NAME,
 bucket=None,
 ),
 DataSet(
 input_field=HOUSING_BURDEN_FIELD_NAME,
 renamed_field=HOUSING_BURDEN_FIELD_NAME,
 bucket=None,
 ),
 DataSet(input_field="ACSTOTPOP", renamed_field="Total population", bucket=None),
 # The following data sets have buckets, because they're used in the score
 DataSet(
 input_field="CANCER",
 renamed_field="Air toxics cancer risk",
 bucket=BUCKET_EXPOSURES,
 ),
 DataSet(
 input_field="RESP",
 renamed_field="Respiratory hazard index",
 bucket=BUCKET_EXPOSURES,
 ),
 DataSet(
 input_field="DSLPM",
 renamed_field="Diesel particulate matter",
 bucket=BUCKET_EXPOSURES,
 ),
 DataSet(
 input_field="PM25",
 renamed_field="Particulate matter (PM2.5)",
 bucket=BUCKET_EXPOSURES,
 ),
 DataSet(input_field="OZONE", renamed_field="Ozone", bucket=BUCKET_EXPOSURES),
 DataSet(
 input_field="PTRAF",
 renamed_field="Traffic proximity and volume",
 bucket=BUCKET_EXPOSURES,
 ),
 DataSet(
 input_field="PRMP",
 renamed_field="Proximity to RMP sites",
 bucket=BUCKET_ENVIRONMENTAL,
 ),
 DataSet(
 input_field="PTSDF",
 renamed_field="Proximity to TSDF sites",
 bucket=BUCKET_ENVIRONMENTAL,
 ),
 DataSet(
 input_field="PNPL",
 renamed_field="Proximity to NPL sites",
 bucket=BUCKET_ENVIRONMENTAL,
 ),
 DataSet(
 input_field="PWDIS",
 renamed_field="Wastewater discharge",
 bucket=BUCKET_ENVIRONMENTAL,
 ),
 DataSet(
 input_field="PRE1960PCT",
 renamed_field="Percent pre-1960s housing (lead paint indicator)",
 bucket=BUCKET_ENVIRONMENTAL,
 ),
 DataSet(
 input_field="UNDER5PCT",
 renamed_field="Individuals under 5 years old",
 bucket=BUCKET_SENSITIVE,
 ),
 DataSet(
 input_field="OVER64PCT",
 renamed_field="Individuals over 64 years old",
 bucket=BUCKET_SENSITIVE,
 ),
 DataSet(
 input_field=LINGUISTIC_ISOLATION_FIELD_NAME,
 renamed_field=LINGUISTIC_ISOLATION_FIELD_NAME,
 bucket=BUCKET_SENSITIVE,
 ),
 DataSet(
 input_field="LINGISOPCT",
 renamed_field="Percent of households in linguistic isolation",
 bucket=BUCKET_SOCIOECONOMIC,
 ),
 DataSet(
 input_field="LOWINCPCT",
 renamed_field=POVERTY_FIELD_NAME,
 bucket=BUCKET_SOCIOECONOMIC,
 ),
 DataSet(
 input_field="LESSHSPCT",
 renamed_field=HIGH_SCHOOL_FIELD_NAME,
 bucket=BUCKET_SOCIOECONOMIC,
 ),
 DataSet(
 input_field=UNEMPLOYED_FIELD_NAME,
 renamed_field=UNEMPLOYED_FIELD_NAME,
 bucket=BUCKET_SOCIOECONOMIC,
 ),
 DataSet(
 input_field="ht_ami",
 renamed_field="Housing + Transportation Costs % Income for the Regional Typical Household",
 bucket=BUCKET_SOCIOECONOMIC,
 ),
]

In [None]:
# Rename columns:
renaming_dict = {data_set.input_field: data_set.renamed_field for data_set in data_sets}

df.rename(
 columns=renaming_dict,
 inplace=True,
 errors="raise",
)

columns_to_keep = [data_set.renamed_field for data_set in data_sets]
df = df[columns_to_keep]

df.head()

In [None]:
# Convert all columns to numeric.
for data_set in data_sets:
 # Skip GEOID_FIELD_NAME, because it's a string.
 if data_set.renamed_field == GEOID_FIELD_NAME:
 continue
 df[f"{data_set.renamed_field}"] = pd.to_numeric(df[data_set.renamed_field])

In [None]:
# calculate percentiles
for data_set in data_sets:
 df[f"{data_set.renamed_field}{PERCENTILE_FIELD_SUFFIX}"] = df[
 data_set.renamed_field
 ].rank(pct=True)

df.head()

In [None]:
# calculate min max
# Math:
# (
# Observed value
# - minimum of all values
# )
# divided by
# (
# Maximum of all values
# - minimum of all values
# )
for data_set in data_sets:
 # Skip GEOID_FIELD_NAME, because it's a string.
 if data_set.renamed_field == GEOID_FIELD_NAME:
 continue

 min_value = df[data_set.renamed_field].min(skipna=True)

 max_value = df[data_set.renamed_field].max(skipna=True)

 print(
 f"For data set {data_set.renamed_field}, the min value is {min_value} and the max value is {max_value}."
 )

 df[f"{data_set.renamed_field}{MIN_MAX_FIELD_SUFFIX}"] = (
 df[data_set.renamed_field] - min_value
 ) / (max_value - min_value)

df.head()

In [None]:
# Calculate score "A" and score "B"
df["Score A"] = df[
 [
 "Poverty (Less than 200% of federal poverty line) (percentile)",
 "Percent individuals age 25 or over with less than high school degree (percentile)",
 ]
].mean(axis=1)
df["Score B"] = (
 df["Poverty (Less than 200% of federal poverty line) (percentile)"]
 * df[
 "Percent individuals age 25 or over with less than high school degree (percentile)"
 ]
)

In [None]:
# Calculate "CalEnviroScreen for the US" score
# Average all the percentile values in each bucket into a single score for each of the four buckets.
for bucket in BUCKETS:
 fields_in_bucket = [
 f"{data_set.renamed_field}{PERCENTILE_FIELD_SUFFIX}"
 for data_set in data_sets
 if data_set.bucket == bucket
 ]
 df[f"{bucket}"] = df[fields_in_bucket].mean(axis=1)

# Combine the score from the two Exposures and Environmental Effects buckets into a single score called "Pollution Burden". The math for this score is: (1.0 * Exposures Score + 0.5 * Environment Effects score) / 1.5.
df[AGGREGATION_POLLUTION] = (
 1.0 * df[f"{BUCKET_EXPOSURES}"] + 0.5 * df[f"{BUCKET_ENVIRONMENTAL}"]
) / 1.5

# Average the score from the two Sensitive populations and Socioeconomic factors buckets into a single score called "Population Characteristics".
df[AGGREGATION_POPULATION] = df[
 [f"{BUCKET_SENSITIVE}", f"{BUCKET_SOCIOECONOMIC}"]
].mean(axis=1)

# Multiply the "Pollution Burden" score and the "Population Characteristics" together to produce the cumulative impact score.
df["Score C"] = df[AGGREGATION_POLLUTION] * df[AGGREGATION_POPULATION]

df.head()

In [None]:
fields_to_use_in_score = [
 UNEMPLOYED_FIELD_NAME,
 LINGUISTIC_ISOLATION_FIELD_NAME,
 HOUSING_BURDEN_FIELD_NAME,
 POVERTY_FIELD_NAME,
 HIGH_SCHOOL_FIELD_NAME,
]

fields_min_max = [f"{field}{MIN_MAX_FIELD_SUFFIX}" for field in fields_to_use_in_score]
fields_percentile = [
 f"{field}{PERCENTILE_FIELD_SUFFIX}" for field in fields_to_use_in_score
]

# Calculate "Score D", which uses min-max normalization
# and calculate "Score E", which uses percentile normalization for the same fields
df["Score D"] = df[fields_min_max].mean(axis=1)
df["Score E"] = df[fields_percentile].mean(axis=1)

print(df["Score D"].describe())
print(df["Score E"].describe())

In [None]:
# Create percentiles for the scores
for score_field in ["Score A", "Score B", "Score C", "Score D", "Score E"]:
 df[f"{score_field}{PERCENTILE_FIELD_SUFFIX}"] = df[score_field].rank(pct=True)
 df[f"{score_field} (top 25th percentile)"] = (
 df[f"{score_field}{PERCENTILE_FIELD_SUFFIX}"] >= 0.75
 )
df.head()

In [None]:
# write nationwide csv
df.to_csv(SCORE_CSV_PATH / f"usa.csv", index=False)

In [None]:
# write per state csvs
for states_fips in get_state_fips_codes(DATA_PATH):
 print(f"Generating data{states_fips} csv")
 df1 = df[df["GEOID10"].str[:2] == states_fips]
 # we need to name the file data01.csv for ogr2ogr csv merge to work
 df1.to_csv(SCORE_CSV_PATH / f"data{states_fips}.csv", index=False)