Data folder restructuring in preparation for 361 (#376)

* initial checkin

* gitignore and docker-compose update

* readme update and error on hud

* encoding issue

* one more small README change

* data roadmap re-strcuture

* pyproject sort

* small update to score output folders

* checkpoint

* couple of last fixes
This commit is contained in:
Jorge Escobar 2021-07-20 14:55:39 -04:00 committed by GitHub
commit 543d147e61
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
66 changed files with 130 additions and 108 deletions

View file

View file

@ -0,0 +1,63 @@
from pathlib import Path
import pathlib
from config import settings
from utils import unzip_file_from_url, remove_all_from_dir
class ExtractTransformLoad(object):
"""
A class used to instantiate an ETL object to retrieve and process data from
datasets.
Attributes:
DATA_PATH (pathlib.Path): Local path where all data will be stored
TMP_PATH (pathlib.Path): Local path where temporary data will be stored
GEOID_FIELD_NAME (str): The common column name for a Census Block Group identifier
GEOID_TRACT_FIELD_NAME (str): The common column name for a Census Tract identifier
"""
DATA_PATH: Path = settings.APP_ROOT / "data"
TMP_PATH: Path = DATA_PATH / "tmp"
GEOID_FIELD_NAME: str = "GEOID10"
GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT"
def get_yaml_config(self) -> None:
"""Reads the YAML configuration file for the dataset and stores
the properies in the instance (upcoming feature)"""
pass
def check_ttl(self) -> None:
"""Checks if the ETL process can be run based on a the TLL value on the
YAML config (upcoming feature)"""
pass
def extract(
self, source_url: str = None, extract_path: Path = None
) -> None:
"""Extract the data from
a remote source. By default it provides code to get the file from a source url,
unzips it and stores it on an extract_path."""
# this can be accessed via super().extract()
if source_url and extract_path:
unzip_file_from_url(source_url, self.TMP_PATH, extract_path)
def transform(self) -> None:
"""Transform the data extracted into a format that can be consumed by the
score generator"""
raise NotImplementedError
def load(self) -> None:
"""Saves the transformed data in the specified local data folder or remote AWS S3
bucket"""
raise NotImplementedError
def cleanup(self) -> None:
"""Clears out any files stored in the TMP folder"""
remove_all_from_dir(self.TMP_PATH)

View file

@ -0,0 +1,114 @@
import importlib
from etl.score.etl_score import ScoreETL
from etl.score.etl_score_post import PostScoreETL
def etl_runner(dataset_to_run: str = None) -> None:
"""Runs all etl processes or a specific one
Args:
dataset_to_run (str): Run a specific ETL process. If missing, runs all processes (optional)
Returns:
None
"""
# this list comes from YAMLs
dataset_list = [
{
"name": "census_acs",
"module_dir": "census_acs",
"class_name": "CensusACSETL",
},
{
"name": "ejscreen",
"module_dir": "ejscreen",
"class_name": "EJScreenETL",
},
{
"name": "housing_and_transportation",
"module_dir": "housing_and_transportation",
"class_name": "HousingTransportationETL",
},
{
"name": "hud_housing",
"module_dir": "hud_housing",
"class_name": "HudHousingETL",
},
{
"name": "calenviroscreen",
"module_dir": "calenviroscreen",
"class_name": "CalEnviroScreenETL",
},
{
"name": "hud_recap",
"module_dir": "hud_recap",
"class_name": "HudRecapETL",
},
]
if dataset_to_run:
dataset_element = next(
(item for item in dataset_list if item["name"] == dataset_to_run),
None,
)
if not dataset_list:
raise ValueError("Invalid dataset name")
else:
# reset the list to just the dataset
dataset_list = [dataset_element]
# Run the ETLs for the dataset_list
for dataset in dataset_list:
etl_module = importlib.import_module(
f"etl.sources.{dataset['module_dir']}.etl"
)
etl_class = getattr(etl_module, dataset["class_name"])
etl_instance = etl_class()
# run extract
etl_instance.extract()
# run transform
etl_instance.transform()
# run load
etl_instance.load()
# cleanup
etl_instance.cleanup()
# update the front end JSON/CSV of list of data sources
pass
def score_generate() -> None:
"""Generates the score and saves it on the local data directory
Args:
None
Returns:
None
"""
# Score Gen
score_gen = ScoreETL()
score_gen.extract()
score_gen.transform()
score_gen.load()
# Post Score Processing
score_post = PostScoreETL()
score_post.extract()
score_post.transform()
score_post.load()
score_post.cleanup()
def _find_dataset_index(dataset_list, key, value):
for i, element in enumerate(dataset_list):
if element[key] == value:
return i
return -1

View file

View file

@ -0,0 +1,410 @@
import collections
import functools
import pandas as pd
from etl.base import ExtractTransformLoad
from utils import get_module_logger
from etl.sources.census.etl_utils import get_state_fips_codes
logger = get_module_logger(__name__)
class ScoreETL(ExtractTransformLoad):
def __init__(self):
# Define some global parameters
self.BUCKET_SOCIOECONOMIC = "Socioeconomic Factors"
self.BUCKET_SENSITIVE = "Sensitive populations"
self.BUCKET_ENVIRONMENTAL = "Environmental effects"
self.BUCKET_EXPOSURES = "Exposures"
self.BUCKETS = [
self.BUCKET_SOCIOECONOMIC,
self.BUCKET_SENSITIVE,
self.BUCKET_ENVIRONMENTAL,
self.BUCKET_EXPOSURES,
]
# A few specific field names
# TODO: clean this up, I name some fields but not others.
self.UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
self.HOUSING_BURDEN_FIELD_NAME = "Housing burden (percent)"
self.POVERTY_FIELD_NAME = (
"Poverty (Less than 200% of federal poverty line)"
)
self.HIGH_SCHOOL_FIELD_NAME = "Percent individuals age 25 or over with less than high school degree"
# There's another aggregation level (a second level of "buckets").
self.AGGREGATION_POLLUTION = "Pollution Burden"
self.AGGREGATION_POPULATION = "Population Characteristics"
self.PERCENTILE_FIELD_SUFFIX = " (percentile)"
self.MIN_MAX_FIELD_SUFFIX = " (min-max normalized)"
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv" / "full"
# dataframes
self.df: pd.DataFrame
self.ejscreen_df: pd.DataFrame
self.census_df: pd.DataFrame
self.housing_and_transportation_df: pd.DataFrame
self.hud_housing_df: pd.DataFrame
def extract(self) -> None:
# EJSCreen csv Load
ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv"
self.ejscreen_df = pd.read_csv(
ejscreen_csv, dtype={"ID": "string"}, low_memory=False
)
self.ejscreen_df.rename(
columns={"ID": self.GEOID_FIELD_NAME}, inplace=True
)
# Load census data
census_csv = self.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
self.census_df = pd.read_csv(
census_csv,
dtype={self.GEOID_FIELD_NAME: "string"},
low_memory=False,
)
# Load housing and transportation data
housing_and_transportation_index_csv = (
self.DATA_PATH
/ "dataset"
/ "housing_and_transportation_index"
/ "usa.csv"
)
self.housing_and_transportation_df = pd.read_csv(
housing_and_transportation_index_csv,
dtype={self.GEOID_FIELD_NAME: "string"},
low_memory=False,
)
# Load HUD housing data
hud_housing_csv = self.DATA_PATH / "dataset" / "hud_housing" / "usa.csv"
self.hud_housing_df = pd.read_csv(
hud_housing_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
def transform(self) -> None:
logger.info(f"Transforming Score Data")
# Join all the data sources that use census block groups
census_block_group_dfs = [
self.ejscreen_df,
self.census_df,
self.housing_and_transportation_df,
]
census_block_group_df = functools.reduce(
lambda left, right: pd.merge(
left=left, right=right, on=self.GEOID_FIELD_NAME, how="outer"
),
census_block_group_dfs,
)
# Sanity check the join.
if (
len(census_block_group_df[self.GEOID_FIELD_NAME].str.len().unique())
!= 1
):
raise ValueError(
f"One of the input CSVs uses {self.GEOID_FIELD_NAME} with a different length."
)
# Join all the data sources that use census tracts
# TODO: when there's more than one data source using census tract, reduce/merge them here.
census_tract_df = self.hud_housing_df
# Calculate the tract for the CBG data.
census_block_group_df[
self.GEOID_TRACT_FIELD_NAME
] = census_block_group_df[self.GEOID_FIELD_NAME].str[0:11]
self.df = census_block_group_df.merge(
census_tract_df, on=self.GEOID_TRACT_FIELD_NAME
)
if len(census_block_group_df) > 220333:
raise ValueError("Too many rows in the join.")
# Define a named tuple that will be used for each data set input.
DataSet = collections.namedtuple(
typename="DataSet",
field_names=["input_field", "renamed_field", "bucket"],
)
data_sets = [
# The following data sets have `bucket=None`, because it's not used in the bucket based score ("Score C").
DataSet(
input_field=self.GEOID_FIELD_NAME,
# Use the name `GEOID10` to enable geoplatform.gov's workflow.
renamed_field=self.GEOID_FIELD_NAME,
bucket=None,
),
DataSet(
input_field=self.HOUSING_BURDEN_FIELD_NAME,
renamed_field=self.HOUSING_BURDEN_FIELD_NAME,
bucket=None,
),
DataSet(
input_field="ACSTOTPOP",
renamed_field="Total population",
bucket=None,
),
# The following data sets have buckets, because they're used in the score
DataSet(
input_field="CANCER",
renamed_field="Air toxics cancer risk",
bucket=self.BUCKET_EXPOSURES,
),
DataSet(
input_field="RESP",
renamed_field="Respiratory hazard index",
bucket=self.BUCKET_EXPOSURES,
),
DataSet(
input_field="DSLPM",
renamed_field="Diesel particulate matter",
bucket=self.BUCKET_EXPOSURES,
),
DataSet(
input_field="PM25",
renamed_field="Particulate matter (PM2.5)",
bucket=self.BUCKET_EXPOSURES,
),
DataSet(
input_field="OZONE",
renamed_field="Ozone",
bucket=self.BUCKET_EXPOSURES,
),
DataSet(
input_field="PTRAF",
renamed_field="Traffic proximity and volume",
bucket=self.BUCKET_EXPOSURES,
),
DataSet(
input_field="PRMP",
renamed_field="Proximity to RMP sites",
bucket=self.BUCKET_ENVIRONMENTAL,
),
DataSet(
input_field="PTSDF",
renamed_field="Proximity to TSDF sites",
bucket=self.BUCKET_ENVIRONMENTAL,
),
DataSet(
input_field="PNPL",
renamed_field="Proximity to NPL sites",
bucket=self.BUCKET_ENVIRONMENTAL,
),
DataSet(
input_field="PWDIS",
renamed_field="Wastewater discharge",
bucket=self.BUCKET_ENVIRONMENTAL,
),
DataSet(
input_field="PRE1960PCT",
renamed_field="Percent pre-1960s housing (lead paint indicator)",
bucket=self.BUCKET_ENVIRONMENTAL,
),
DataSet(
input_field="UNDER5PCT",
renamed_field="Individuals under 5 years old",
bucket=self.BUCKET_SENSITIVE,
),
DataSet(
input_field="OVER64PCT",
renamed_field="Individuals over 64 years old",
bucket=self.BUCKET_SENSITIVE,
),
DataSet(
input_field=self.LINGUISTIC_ISOLATION_FIELD_NAME,
renamed_field=self.LINGUISTIC_ISOLATION_FIELD_NAME,
bucket=self.BUCKET_SENSITIVE,
),
DataSet(
input_field="LINGISOPCT",
renamed_field="Percent of households in linguistic isolation",
bucket=self.BUCKET_SOCIOECONOMIC,
),
DataSet(
input_field="LOWINCPCT",
renamed_field=self.POVERTY_FIELD_NAME,
bucket=self.BUCKET_SOCIOECONOMIC,
),
DataSet(
input_field="LESSHSPCT",
renamed_field=self.HIGH_SCHOOL_FIELD_NAME,
bucket=self.BUCKET_SOCIOECONOMIC,
),
DataSet(
input_field=self.UNEMPLOYED_FIELD_NAME,
renamed_field=self.UNEMPLOYED_FIELD_NAME,
bucket=self.BUCKET_SOCIOECONOMIC,
),
DataSet(
input_field="ht_ami",
renamed_field="Housing + Transportation Costs % Income for the Regional Typical Household",
bucket=self.BUCKET_SOCIOECONOMIC,
),
]
# Rename columns:
renaming_dict = {
data_set.input_field: data_set.renamed_field
for data_set in data_sets
}
self.df.rename(
columns=renaming_dict,
inplace=True,
errors="raise",
)
columns_to_keep = [data_set.renamed_field for data_set in data_sets]
self.df = self.df[columns_to_keep]
# Convert all columns to numeric.
for data_set in data_sets:
# Skip GEOID_FIELD_NAME, because it's a string.
if data_set.renamed_field == self.GEOID_FIELD_NAME:
continue
self.df[f"{data_set.renamed_field}"] = pd.to_numeric(
self.df[data_set.renamed_field]
)
# calculate percentiles
for data_set in data_sets:
self.df[
f"{data_set.renamed_field}{self.PERCENTILE_FIELD_SUFFIX}"
] = self.df[data_set.renamed_field].rank(pct=True)
# Math:
# (
# Observed value
# - minimum of all values
# )
# divided by
# (
# Maximum of all values
# - minimum of all values
# )
for data_set in data_sets:
# Skip GEOID_FIELD_NAME, because it's a string.
if data_set.renamed_field == self.GEOID_FIELD_NAME:
continue
min_value = self.df[data_set.renamed_field].min(skipna=True)
max_value = self.df[data_set.renamed_field].max(skipna=True)
logger.info(
f"For data set {data_set.renamed_field}, the min value is {min_value} and the max value is {max_value}."
)
self.df[f"{data_set.renamed_field}{self.MIN_MAX_FIELD_SUFFIX}"] = (
self.df[data_set.renamed_field] - min_value
) / (max_value - min_value)
# Graph distributions and correlations.
min_max_fields = [
f"{data_set.renamed_field}{self.MIN_MAX_FIELD_SUFFIX}"
for data_set in data_sets
if data_set.renamed_field != self.GEOID_FIELD_NAME
]
# Calculate score "A" and score "B"
self.df["Score A"] = self.df[
[
"Poverty (Less than 200% of federal poverty line) (percentile)",
"Percent individuals age 25 or over with less than high school degree (percentile)",
]
].mean(axis=1)
self.df["Score B"] = (
self.df[
"Poverty (Less than 200% of federal poverty line) (percentile)"
]
* self.df[
"Percent individuals age 25 or over with less than high school degree (percentile)"
]
)
# Calculate "CalEnviroScreen for the US" score
# Average all the percentile values in each bucket into a single score for each of the four buckets.
for bucket in self.BUCKETS:
fields_in_bucket = [
f"{data_set.renamed_field}{self.PERCENTILE_FIELD_SUFFIX}"
for data_set in data_sets
if data_set.bucket == bucket
]
self.df[f"{bucket}"] = self.df[fields_in_bucket].mean(axis=1)
# Combine the score from the two Exposures and Environmental Effects buckets into a single score called "Pollution Burden". The math for this score is: (1.0 * Exposures Score + 0.5 * Environment Effects score) / 1.5.
self.df[self.AGGREGATION_POLLUTION] = (
1.0 * self.df[f"{self.BUCKET_EXPOSURES}"]
+ 0.5 * self.df[f"{self.BUCKET_ENVIRONMENTAL}"]
) / 1.5
# Average the score from the two Sensitive populations and Socioeconomic factors buckets into a single score called "Population Characteristics".
self.df[self.AGGREGATION_POPULATION] = self.df[
[f"{self.BUCKET_SENSITIVE}", f"{self.BUCKET_SOCIOECONOMIC}"]
].mean(axis=1)
# Multiply the "Pollution Burden" score and the "Population Characteristics" together to produce the cumulative impact score.
self.df["Score C"] = (
self.df[self.AGGREGATION_POLLUTION]
* self.df[self.AGGREGATION_POPULATION]
)
if len(census_block_group_df) > 220333:
raise ValueError("Too many rows in the join.")
fields_to_use_in_score = [
self.UNEMPLOYED_FIELD_NAME,
self.LINGUISTIC_ISOLATION_FIELD_NAME,
self.HOUSING_BURDEN_FIELD_NAME,
self.POVERTY_FIELD_NAME,
self.HIGH_SCHOOL_FIELD_NAME,
]
fields_min_max = [
f"{field}{self.MIN_MAX_FIELD_SUFFIX}"
for field in fields_to_use_in_score
]
fields_percentile = [
f"{field}{self.PERCENTILE_FIELD_SUFFIX}"
for field in fields_to_use_in_score
]
# Calculate "Score D", which uses min-max normalization
# and calculate "Score E", which uses percentile normalization for the same fields
self.df["Score D"] = self.df[fields_min_max].mean(axis=1)
self.df["Score E"] = self.df[fields_percentile].mean(axis=1)
# Calculate correlations
self.df[fields_min_max].corr()
# Create percentiles for the scores
for score_field in [
"Score A",
"Score B",
"Score C",
"Score D",
"Score E",
]:
self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = self.df[
score_field
].rank(pct=True)
self.df[f"{score_field} (top 25th percentile)"] = (
self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] >= 0.75
)
def load(self) -> None:
logger.info(f"Saving Score CSV")
# write nationwide csv
self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.SCORE_CSV_PATH / f"usa.csv", index=False)

View file

@ -0,0 +1,112 @@
import pandas as pd
from etl.base import ExtractTransformLoad
from utils import get_module_logger
logger = get_module_logger(__name__)
class PostScoreETL(ExtractTransformLoad):
"""
A class used to instantiate an ETL object to retrieve and process data from
datasets.
"""
def __init__(self):
self.CENSUS_COUNTIES_ZIP_URL = "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/Gaz_counties_national.zip"
self.CENSUS_COUNTIES_TXT = self.TMP_PATH / "Gaz_counties_national.txt"
self.CENSUS_COUNTIES_COLS = ["USPS", "GEOID", "NAME"]
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
self.STATE_CSV = (
self.DATA_PATH / "census" / "csv" / "fips_states_2010.csv"
)
self.FULL_SCORE_CSV = self.SCORE_CSV_PATH / "full" / "usa.csv"
self.TILR_SCORE_CSV = self.SCORE_CSV_PATH / "tile" / "usa.csv"
self.TILES_SCORE_COLUMNS = [
"GEOID10",
"Score E (percentile)",
"Score E (top 25th percentile)",
"GEOID",
"State Abbreviation",
"County Name",
]
self.TILES_SCORE_CSV_PATH = self.SCORE_CSV_PATH / "tiles"
self.TILES_SCORE_CSV = self.TILES_SCORE_CSV_PATH / "usa.csv"
self.counties_df: pd.DataFrame
self.states_df: pd.DataFrame
self.score_df: pd.DataFrame
self.score_county_state_merged: pd.DataFrame
self.score_for_tiles: pd.DataFrame
def extract(self) -> None:
super().extract(
self.CENSUS_COUNTIES_ZIP_URL,
self.TMP_PATH,
)
logger.info(f"Reading Counties CSV")
self.counties_df = pd.read_csv(
self.CENSUS_COUNTIES_TXT,
sep="\t",
dtype={"GEOID": "string", "USPS": "string"},
low_memory=False,
encoding="latin-1",
)
logger.info(f"Reading States CSV")
self.states_df = pd.read_csv(
self.STATE_CSV, dtype={"fips": "string", "state_code": "string"}
)
self.score_df = pd.read_csv(
self.FULL_SCORE_CSV, dtype={"GEOID10": "string"}
)
def transform(self) -> None:
logger.info(f"Transforming data sources for Score + County CSV")
# rename some of the columns to prepare for merge
self.counties_df = self.counties_df[["USPS", "GEOID", "NAME"]]
self.counties_df.rename(
columns={"USPS": "State Abbreviation", "NAME": "County Name"},
inplace=True,
)
# remove unnecessary columns
self.states_df.rename(
columns={
"fips": "State Code",
"state_name": "State Name",
"state_abbreviation": "State Abbreviation",
},
inplace=True,
)
self.states_df.drop(["region", "division"], axis=1, inplace=True)
# add the tract level column
self.score_df["GEOID"] = self.score_df.GEOID10.str[:5]
# merge state and counties
county_state_merged = self.counties_df.join(
self.states_df, rsuffix=" Other"
)
del county_state_merged["State Abbreviation Other"]
# merge county and score
self.score_county_state_merged = self.score_df.join(
county_state_merged, rsuffix="_OTHER"
)
del self.score_county_state_merged["GEOID_OTHER"]
def load(self) -> None:
logger.info(f"Saving Full Score CSV with County Information")
self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
self.score_county_state_merged.to_csv(self.FULL_SCORE_CSV, index=False)
logger.info(f"Saving Tile Score CSV")
# TODO: check which are the columns we'll use
# Related to: https://github.com/usds/justice40-tool/issues/302
score_tiles = self.score_county_state_merged[self.TILES_SCORE_COLUMNS]
self.TILES_SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
score_tiles.to_csv(self.TILES_SCORE_CSV, index=False)

View file

@ -0,0 +1,69 @@
import pandas as pd
from etl.base import ExtractTransformLoad
from utils import get_module_logger
logger = get_module_logger(__name__)
class CalEnviroScreenETL(ExtractTransformLoad):
def __init__(self):
self.CALENVIROSCREEN_FTP_URL = "https://justice40-data.s3.amazonaws.com/CalEnviroScreen/CalEnviroScreen_4.0_2021.zip"
self.CALENVIROSCREEN_CSV = self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4"
# Definining some variable names
self.CALENVIROSCREEN_SCORE_FIELD_NAME = "calenviroscreen_score"
self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME = "calenviroscreen_percentile"
self.CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD_NAME = (
"calenviroscreen_priority_community"
)
# Choosing constants.
# None of these numbers are final, but just for the purposes of comparison.
self.CALENVIROSCREEN_PRIORITY_COMMUNITY_THRESHOLD = 75
self.df: pd.DataFrame
def extract(self) -> None:
logger.info(f"Downloading CalEnviroScreen Data")
super().extract(
self.CALENVIROSCREEN_FTP_URL,
self.TMP_PATH,
)
def transform(self) -> None:
logger.info(f"Transforming CalEnviroScreen Data")
# Data from https://calenviroscreen-oehha.hub.arcgis.com/#Data, specifically:
# https://oehha.ca.gov/media/downloads/calenviroscreen/document/calenviroscreen40resultsdatadictionaryd12021.zip
# Load comparison index (CalEnviroScreen 4)
self.df = pd.read_csv(
self.CALENVIROSCREEN_CSV, dtype={"Census Tract": "string"}
)
self.df.rename(
columns={
"Census Tract": self.GEOID_TRACT_FIELD_NAME,
"DRAFT CES 4.0 Score": self.CALENVIROSCREEN_SCORE_FIELD_NAME,
"DRAFT CES 4.0 Percentile": self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME,
},
inplace=True,
)
# Add a leading "0" to the Census Tract to match our format in other data frames.
self.df[self.GEOID_TRACT_FIELD_NAME] = (
"0" + self.df[self.GEOID_TRACT_FIELD_NAME]
)
# Calculate the top K% of prioritized communities
self.df[self.CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD_NAME] = (
self.df[self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME]
>= self.CALENVIROSCREEN_PRIORITY_COMMUNITY_THRESHOLD
)
def load(self) -> None:
logger.info(f"Saving CalEnviroScreen CSV")
# write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.CSV_PATH / f"data06.csv", index=False)

View file

@ -0,0 +1,111 @@
import csv
import os
import json
from pathlib import Path
from .etl_utils import get_state_fips_codes
from utils import unzip_file_from_url, get_module_logger
logger = get_module_logger(__name__)
def download_census_csvs(data_path: Path) -> None:
"""Download all census shape files from the Census FTP and extract the geojson
to generate national and by state Census Block Group CSVs
Args:
data_path (pathlib.Path): Name of the directory where the files and directories will
be created
Returns:
None
"""
# the fips_states_2010.csv is generated from data here
# https://www.census.gov/geographies/reference-files/time-series/geo/tallies.html
state_fips_codes = get_state_fips_codes(data_path)
geojson_dir_path = data_path / "census" / "geojson"
for fips in state_fips_codes:
# check if file exists
shp_file_path = (
data_path / "census" / "shp" / fips / f"tl_2010_{fips}_bg10.shp"
)
logger.info(f"Checking if {fips} file exists")
if not os.path.isfile(shp_file_path):
logger.info(f"Downloading and extracting {fips} shape file")
# 2020 tiger data is here: https://www2.census.gov/geo/tiger/TIGER2020/BG/
# But using 2010 for now
cbg_state_url = f"https://www2.census.gov/geo/tiger/TIGER2010/BG/2010/tl_2010_{fips}_bg10.zip"
unzip_file_from_url(
cbg_state_url,
data_path / "tmp",
data_path / "census" / "shp" / fips,
)
cmd = (
"ogr2ogr -f GeoJSON data/census/geojson/"
+ fips
+ ".json data/census/shp/"
+ fips
+ "/tl_2010_"
+ fips
+ "_bg10.shp"
)
os.system(cmd)
# generate CBG CSV table for pandas
## load in memory
cbg_national = [] # in-memory global list
cbg_per_state: dict = {} # in-memory dict per state
for file in os.listdir(geojson_dir_path):
if file.endswith(".json"):
logger.info(f"Ingesting geoid10 for file {file}")
with open(geojson_dir_path / file) as f:
geojson = json.load(f)
for feature in geojson["features"]:
geoid10 = feature["properties"]["GEOID10"]
cbg_national.append(str(geoid10))
geoid10_state_id = geoid10[:2]
if not cbg_per_state.get(geoid10_state_id):
cbg_per_state[geoid10_state_id] = []
cbg_per_state[geoid10_state_id].append(geoid10)
csv_dir_path = data_path / "census" / "csv"
## write to individual state csv
for state_id in cbg_per_state:
geoid10_list = cbg_per_state[state_id]
with open(
csv_dir_path / f"{state_id}.csv", mode="w", newline=""
) as cbg_csv_file:
cbg_csv_file_writer = csv.writer(
cbg_csv_file,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
)
for geoid10 in geoid10_list:
cbg_csv_file_writer.writerow(
[
geoid10,
]
)
## write US csv
with open(csv_dir_path / "us.csv", mode="w", newline="") as cbg_csv_file:
cbg_csv_file_writer = csv.writer(
cbg_csv_file,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
)
for geoid10 in cbg_national:
cbg_csv_file_writer.writerow(
[
geoid10,
]
)
logger.info("Census block groups downloading complete")

View file

@ -0,0 +1,55 @@
from pathlib import Path
import csv
import os
from config import settings
from utils import (
remove_files_from_dir,
remove_all_dirs_from_dir,
unzip_file_from_url,
get_module_logger,
)
logger = get_module_logger(__name__)
def reset_data_directories(data_path: Path) -> None:
census_data_path = data_path / "census"
# csv
csv_path = census_data_path / "csv"
remove_files_from_dir(csv_path, ".csv")
# geojson
geojson_path = census_data_path / "geojson"
remove_files_from_dir(geojson_path, ".json")
# shp
shp_path = census_data_path / "shp"
remove_all_dirs_from_dir(shp_path)
def get_state_fips_codes(data_path: Path) -> list:
fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv"
# check if file exists
if not os.path.isfile(fips_csv_path):
logger.info(f"Downloading fips from S3 repository")
unzip_file_from_url(
settings.AWS_JUSTICE40_DATA_URL + "/Census/fips_states_2010.zip",
data_path / "tmp",
data_path / "census" / "csv",
)
fips_state_list = []
with open(fips_csv_path) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")
line_count = 0
for row in csv_reader:
if line_count == 0:
line_count += 1
else:
fips = row[0].strip()
fips_state_list.append(fips)
return fips_state_list

View file

@ -0,0 +1,108 @@
import pandas as pd
import censusdata
from etl.base import ExtractTransformLoad
from etl.sources.census.etl_utils import get_state_fips_codes
from utils import get_module_logger
logger = get_module_logger(__name__)
class CensusACSETL(ExtractTransformLoad):
def __init__(self):
self.ACS_YEAR = 2019
self.OUTPUT_PATH = (
self.DATA_PATH / "dataset" / f"census_acs_{self.ACS_YEAR}"
)
self.UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME = (
"Linguistic isolation (total)"
)
self.LINGUISTIC_ISOLATION_FIELDS = [
"C16002_001E",
"C16002_004E",
"C16002_007E",
"C16002_010E",
"C16002_013E",
]
self.df: pd.DataFrame
def _fips_from_censusdata_censusgeo(
self, censusgeo: censusdata.censusgeo
) -> str:
"""Create a FIPS code from the proprietary censusgeo index."""
fips = "".join([value for (key, value) in censusgeo.params()])
return fips
def extract(self) -> None:
dfs = []
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(
f"Downloading data for state/territory with FIPS code {fips}"
)
dfs.append(
censusdata.download(
src="acs5",
year=self.ACS_YEAR,
geo=censusdata.censusgeo(
[("state", fips), ("county", "*"), ("block group", "*")]
),
var=[
# Emploment fields
"B23025_005E",
"B23025_003E",
]
+ self.LINGUISTIC_ISOLATION_FIELDS,
)
)
self.df = pd.concat(dfs)
self.df[self.GEOID_FIELD_NAME] = self.df.index.to_series().apply(
func=self._fips_from_censusdata_censusgeo
)
def transform(self) -> None:
logger.info(f"Starting Census ACS Transform")
# Calculate percent unemployment.
# TODO: remove small-sample data that should be `None` instead of a high-variance fraction.
self.df[self.UNEMPLOYED_FIELD_NAME] = (
self.df.B23025_005E / self.df.B23025_003E
)
# Calculate linguistic isolation.
individual_limited_english_fields = [
"C16002_004E",
"C16002_007E",
"C16002_010E",
"C16002_013E",
]
self.df[self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME] = self.df[
individual_limited_english_fields
].sum(axis=1, skipna=True)
self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME] = (
self.df[self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME].astype(float)
/ self.df["C16002_001E"]
)
self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME].describe()
def load(self) -> None:
logger.info(f"Saving Census ACS Data")
# mkdir census
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
columns_to_include = [
self.GEOID_FIELD_NAME,
self.UNEMPLOYED_FIELD_NAME,
self.LINGUISTIC_ISOLATION_FIELD_NAME,
]
self.df[columns_to_include].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)

View file

@ -0,0 +1,37 @@
import pandas as pd
from etl.base import ExtractTransformLoad
from utils import get_module_logger
logger = get_module_logger(__name__)
class EJScreenETL(ExtractTransformLoad):
def __init__(self):
self.EJSCREEN_FTP_URL = "https://gaftp.epa.gov/EJSCREEN/2019/EJSCREEN_2019_StatePctile.csv.zip"
self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_2019_StatePctiles.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019"
self.df: pd.DataFrame
def extract(self) -> None:
logger.info(f"Downloading EJScreen Data")
super().extract(
self.EJSCREEN_FTP_URL,
self.TMP_PATH,
)
def transform(self) -> None:
logger.info(f"Transforming EJScreen Data")
self.df = pd.read_csv(
self.EJSCREEN_CSV,
dtype={"ID": "string"},
# EJSCREEN writes the word "None" for NA data.
na_values=["None"],
low_memory=False,
)
def load(self) -> None:
logger.info(f"Saving EJScreen CSV")
# write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.CSV_PATH / f"usa.csv", index=False)

View file

@ -0,0 +1,62 @@
import pandas as pd
from etl.base import ExtractTransformLoad
from etl.sources.census.etl_utils import get_state_fips_codes
from utils import get_module_logger, unzip_file_from_url
logger = get_module_logger(__name__)
class HousingTransportationETL(ExtractTransformLoad):
def __init__(self):
self.HOUSING_FTP_URL = (
"https://htaindex.cnt.org/download/download.php?focus=blkgrp&geoid="
)
self.OUTPUT_PATH = (
self.DATA_PATH / "dataset" / "housing_and_transportation_index"
)
self.df: pd.DataFrame
def extract(self) -> None:
# Download each state / territory individually
dfs = []
zip_file_dir = self.TMP_PATH / "housing_and_transportation_index"
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(
f"Downloading housing data for state/territory with FIPS code {fips}"
)
# Puerto Rico has no data, so skip
if fips == "72":
continue
unzip_file_from_url(
f"{self.HOUSING_FTP_URL}{fips}", self.TMP_PATH, zip_file_dir
)
# New file name:
tmp_csv_file_path = (
zip_file_dir / f"htaindex_data_blkgrps_{fips}.csv"
)
tmp_df = pd.read_csv(filepath_or_buffer=tmp_csv_file_path)
dfs.append(tmp_df)
self.df = pd.concat(dfs)
self.df.head()
def transform(self) -> None:
logger.info(f"Transforming Housing and Transportation Data")
# Rename and reformat block group ID
self.df.rename(columns={"blkgrp": self.GEOID_FIELD_NAME}, inplace=True)
self.df[self.GEOID_FIELD_NAME] = self.df[
self.GEOID_FIELD_NAME
].str.replace('"', "")
def load(self) -> None:
logger.info(f"Saving Housing and Transportation Data")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)

View file

@ -0,0 +1,187 @@
import pandas as pd
from etl.base import ExtractTransformLoad
from etl.sources.census.etl_utils import get_state_fips_codes
from utils import get_module_logger, unzip_file_from_url, remove_all_from_dir
logger = get_module_logger(__name__)
class HudHousingETL(ExtractTransformLoad):
def __init__(self):
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "hud_housing"
self.GEOID_TRACT_FIELD_NAME = "GEOID10_TRACT"
self.HOUSING_FTP_URL = "https://www.huduser.gov/portal/datasets/cp/2012thru2016-140-csv.zip"
self.HOUSING_ZIP_FILE_DIR = self.TMP_PATH / "hud_housing"
# We measure households earning less than 80% of HUD Area Median Family Income by county
# and paying greater than 30% of their income to housing costs.
self.HOUSING_BURDEN_FIELD_NAME = "Housing burden (percent)"
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME = "HOUSING_BURDEN_NUMERATOR"
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME = (
"HOUSING_BURDEN_DENOMINATOR"
)
# Note: some variable definitions.
# HUD-adjusted median family income (HAMFI).
# The four housing problems are: incomplete kitchen facilities, incomplete plumbing facilities, more than 1 person per room, and cost burden greater than 30%.
# Table 8 is the desired table.
self.df: pd.DataFrame
def extract(self) -> None:
logger.info(f"Extracting HUD Housing Data")
super().extract(
self.HOUSING_FTP_URL,
self.HOUSING_ZIP_FILE_DIR,
)
def transform(self) -> None:
logger.info(f"Transforming HUD Housing Data")
# New file name:
tmp_csv_file_path = (
self.HOUSING_ZIP_FILE_DIR
/ "2012thru2016-140-csv"
/ "2012thru2016-140-csv"
/ "140"
/ "Table8.csv"
)
self.df = pd.read_csv(
filepath_or_buffer=tmp_csv_file_path,
encoding="latin-1",
)
# Rename and reformat block group ID
self.df.rename(
columns={"geoid": self.GEOID_TRACT_FIELD_NAME}, inplace=True
)
# The CHAS data has census tract ids such as `14000US01001020100`
# Whereas the rest of our data uses, for the same tract, `01001020100`.
# the characters before `US`:
self.df[self.GEOID_TRACT_FIELD_NAME] = self.df[
self.GEOID_TRACT_FIELD_NAME
].str.replace(r"^.*?US", "", regex=True)
# Calculate housing burden
# This is quite a number of steps. It does not appear to be accessible nationally in a simpler format, though.
# See "CHAS data dictionary 12-16.xlsx"
# Owner occupied numerator fields
OWNER_OCCUPIED_NUMERATOR_FIELDS = [
# Key: Column Name Line_Type Tenure Household income Cost burden Facilities
# T8_est7 Subtotal Owner occupied less than or equal to 30% of HAMFI greater than 30% but less than or equal to 50% All
"T8_est7",
# T8_est10 Subtotal Owner occupied less than or equal to 30% of HAMFI greater than 50% All
"T8_est10",
# T8_est20 Subtotal Owner occupied greater than 30% but less than or equal to 50% of HAMFI greater than 30% but less than or equal to 50% All
"T8_est20",
# T8_est23 Subtotal Owner occupied greater than 30% but less than or equal to 50% of HAMFI greater than 50% All
"T8_est23",
# T8_est33 Subtotal Owner occupied greater than 50% but less than or equal to 80% of HAMFI greater than 30% but less than or equal to 50% All
"T8_est33",
# T8_est36 Subtotal Owner occupied greater than 50% but less than or equal to 80% of HAMFI greater than 50% All
"T8_est36",
]
# These rows have the values where HAMFI was not computed, b/c of no or negative income.
OWNER_OCCUPIED_NOT_COMPUTED_FIELDS = [
# Key: Column Name Line_Type Tenure Household income Cost burden Facilities
# T8_est13 Subtotal Owner occupied less than or equal to 30% of HAMFI not computed (no/negative income) All
"T8_est13",
# T8_est26 Subtotal Owner occupied greater than 30% but less than or equal to 50% of HAMFI not computed (no/negative income) All
"T8_est26",
# T8_est39 Subtotal Owner occupied greater than 50% but less than or equal to 80% of HAMFI not computed (no/negative income) All
"T8_est39",
# T8_est52 Subtotal Owner occupied greater than 80% but less than or equal to 100% of HAMFI not computed (no/negative income) All
"T8_est52",
# T8_est65 Subtotal Owner occupied greater than 100% of HAMFI not computed (no/negative income) All
"T8_est65",
]
# T8_est2 Subtotal Owner occupied All All All
OWNER_OCCUPIED_POPULATION_FIELD = "T8_est2"
# Renter occupied numerator fields
RENTER_OCCUPIED_NUMERATOR_FIELDS = [
# Key: Column Name Line_Type Tenure Household income Cost burden Facilities
# T8_est73 Subtotal Renter occupied less than or equal to 30% of HAMFI greater than 30% but less than or equal to 50% All
"T8_est73",
# T8_est76 Subtotal Renter occupied less than or equal to 30% of HAMFI greater than 50% All
"T8_est76",
# T8_est86 Subtotal Renter occupied greater than 30% but less than or equal to 50% of HAMFI greater than 30% but less than or equal to 50% All
"T8_est86",
# T8_est89 Subtotal Renter occupied greater than 30% but less than or equal to 50% of HAMFI greater than 50% All
"T8_est89",
# T8_est99 Subtotal Renter occupied greater than 50% but less than or equal to 80% of HAMFI greater than 30% but less than or equal to 50% All
"T8_est99",
# T8_est102 Subtotal Renter occupied greater than 50% but less than or equal to 80% of HAMFI greater than 50% All
"T8_est102",
]
# These rows have the values where HAMFI was not computed, b/c of no or negative income.
RENTER_OCCUPIED_NOT_COMPUTED_FIELDS = [
# Key: Column Name Line_Type Tenure Household income Cost burden Facilities
# T8_est79 Subtotal Renter occupied less than or equal to 30% of HAMFI not computed (no/negative income) All
"T8_est79",
# T8_est92 Subtotal Renter occupied greater than 30% but less than or equal to 50% of HAMFI not computed (no/negative income) All
"T8_est92",
# T8_est105 Subtotal Renter occupied greater than 50% but less than or equal to 80% of HAMFI not computed (no/negative income) All
"T8_est105",
# T8_est118 Subtotal Renter occupied greater than 80% but less than or equal to 100% of HAMFI not computed (no/negative income) All
"T8_est118",
# T8_est131 Subtotal Renter occupied greater than 100% of HAMFI not computed (no/negative income) All
"T8_est131",
]
# T8_est68 Subtotal Renter occupied All All All
RENTER_OCCUPIED_POPULATION_FIELD = "T8_est68"
# Math:
# (
# # of Owner Occupied Units Meeting Criteria
# + # of Renter Occupied Units Meeting Criteria
# )
# divided by
# (
# Total # of Owner Occupied Units
# + Total # of Renter Occupied Units
# - # of Owner Occupied Units with HAMFI Not Computed
# - # of Renter Occupied Units with HAMFI Not Computed
# )
self.df[self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME] = self.df[
OWNER_OCCUPIED_NUMERATOR_FIELDS
].sum(axis=1) + self.df[RENTER_OCCUPIED_NUMERATOR_FIELDS].sum(axis=1)
self.df[self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME] = (
self.df[OWNER_OCCUPIED_POPULATION_FIELD]
+ self.df[RENTER_OCCUPIED_POPULATION_FIELD]
- self.df[OWNER_OCCUPIED_NOT_COMPUTED_FIELDS].sum(axis=1)
- self.df[RENTER_OCCUPIED_NOT_COMPUTED_FIELDS].sum(axis=1)
)
# TODO: add small sample size checks
self.df[self.HOUSING_BURDEN_FIELD_NAME] = self.df[
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME
].astype(float) / self.df[
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME
].astype(
float
)
def load(self) -> None:
logger.info(f"Saving HUD Housing Data")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
# Drop unnecessary fields
self.df[
[
self.GEOID_TRACT_FIELD_NAME,
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME,
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME,
self.HOUSING_BURDEN_FIELD_NAME,
]
].to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)

View file

@ -0,0 +1,63 @@
import pandas as pd
import requests
from etl.base import ExtractTransformLoad
from utils import get_module_logger
logger = get_module_logger(__name__)
class HudRecapETL(ExtractTransformLoad):
def __init__(self):
self.HUD_RECAP_CSV_URL = "https://opendata.arcgis.com/api/v3/datasets/56de4edea8264fe5a344da9811ef5d6e_0/downloads/data?format=csv&spatialRefId=4326"
self.HUD_RECAP_CSV = (
self.TMP_PATH
/ "Racially_or_Ethnically_Concentrated_Areas_of_Poverty__R_ECAPs_.csv"
)
self.CSV_PATH = self.DATA_PATH / "dataset" / "hud_recap"
# Definining some variable names
self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME = "hud_recap_priority_community"
self.df: pd.DataFrame
def extract(self) -> None:
logger.info(f"Downloading HUD Recap Data")
download = requests.get(self.HUD_RECAP_CSV_URL, verify=None)
file_contents = download.content
csv_file = open(self.HUD_RECAP_CSV, "wb")
csv_file.write(file_contents)
csv_file.close()
def transform(self) -> None:
logger.info(f"Transforming HUD Recap Data")
# Load comparison index (CalEnviroScreen 4)
self.df = pd.read_csv(self.HUD_RECAP_CSV, dtype={"Census Tract": "string"})
self.df.rename(
columns={
"GEOID": self.GEOID_TRACT_FIELD_NAME,
# Interestingly, there's no data dictionary for the RECAP data that I could find.
# However, this site (http://www.schousing.com/library/Tax%20Credit/2020/QAP%20Instructions%20(2).pdf)
# suggests:
# "If RCAP_Current for the tract in which the site is located is 1, the tract is an R/ECAP. If RCAP_Current is 0, it is not."
"RCAP_Current": self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME,
},
inplace=True,
)
# Convert to boolean
self.df[self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME] = self.df[
self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME
].astype("bool")
self.df[self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME].value_counts()
self.df.sort_values(by=self.GEOID_TRACT_FIELD_NAME, inplace=True)
def load(self) -> None:
logger.info(f"Saving HUD Recap CSV")
# write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.CSV_PATH / f"usa.csv", index=False)