Adding persistent poverty tracts (#738)

* persistent poverty working

* fixing left-padding

* running black and adding persistent poverty to comp tool

* fixing bug

* running black and fixing linter

* fixing linter

* fixing linter error
This commit is contained in:
Lucas Merrill Brown 2021-09-22 16:57:08 -05:00 committed by GitHub
commit b1a4d26be8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 518 additions and 201 deletions

View file

@ -22,8 +22,9 @@ class ExtractTransformLoad:
FILES_PATH: Path = settings.APP_ROOT / "files"
GEOID_FIELD_NAME: str = "GEOID10"
GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT"
# TODO: investigate. Census says there are only 217,740 CBGs in the US.
# TODO: investigate. Census says there are only 217,740 CBGs in the US. This might be from CBGs at different time periods.
EXPECTED_MAX_CENSUS_BLOCK_GROUPS: int = 220405
EXPECTED_MAX_CENSUS_TRACTS: int = 73076
def get_yaml_config(self) -> None:
"""Reads the YAML configuration file for the dataset and stores

View file

@ -64,6 +64,11 @@ DATASET_LIST = [
"module_dir": "geocorr",
"class_name": "GeoCorrETL",
},
{
"name": "persistent_poverty",
"module_dir": "persistent_poverty",
"class_name": "PersistentPovertyETL",
},
]
CENSUS_INFO = {
"name": "census",

View file

@ -83,6 +83,9 @@ class ScoreETL(ExtractTransformLoad):
# Urban Rural Map
self.URBAN_HERUISTIC_FIELD_NAME = "Urban Heuristic Flag"
# Persistent poverty
self.PERSISTENT_POVERTY_FIELD = "Persistent Poverty Census Tract"
# dataframes
self.df: pd.DataFrame
self.ejscreen_df: pd.DataFrame
@ -95,6 +98,7 @@ class ScoreETL(ExtractTransformLoad):
self.doe_energy_burden_df: pd.DataFrame
self.national_risk_index_df: pd.DataFrame
self.geocorr_urban_rural_df: pd.DataFrame
self.persistent_poverty_df: pd.DataFrame
def data_sets(self) -> list:
# Define a named tuple that will be used for each data set input.
@ -206,6 +210,11 @@ class ScoreETL(ExtractTransformLoad):
renamed_field=self.URBAN_HERUISTIC_FIELD_NAME,
bucket=None,
),
DataSet(
input_field=self.PERSISTENT_POVERTY_FIELD,
renamed_field=self.PERSISTENT_POVERTY_FIELD,
bucket=None,
),
# The following data sets have buckets, because they're used in Score C
DataSet(
input_field="CANCER",
@ -405,6 +414,16 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False,
)
# Load persistent poverty
persistent_poverty_csv = (
self.DATA_PATH / "dataset" / "persistent_poverty" / "usa.csv"
)
self.persistent_poverty_df = pd.read_csv(
persistent_poverty_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame:
logger.info("Joining Census Block Group dataframes")
census_block_group_df = functools.reduce(
@ -692,6 +711,7 @@ class ScoreETL(ExtractTransformLoad):
self.cdc_life_expectancy_df,
self.doe_energy_burden_df,
self.geocorr_urban_rural_df,
self.persistent_poverty_df,
]
census_tract_df = self._join_tract_dfs(census_tract_dfs)
@ -743,7 +763,11 @@ class ScoreETL(ExtractTransformLoad):
# TODO do this at the same time as calculating percentiles in future refactor
for data_set in data_sets:
# Skip GEOID_FIELD_NAME, because it's a string.
if data_set.renamed_field == self.GEOID_FIELD_NAME:
# Skip `PERSISTENT_POVERTY_FIELD` because it's a straight pass-through.
if data_set.renamed_field in (
self.GEOID_FIELD_NAME,
self.PERSISTENT_POVERTY_FIELD,
):
continue
df[data_set.renamed_field] = pd.to_numeric(

View file

@ -0,0 +1,174 @@
import functools
import pandas as pd
from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import (
get_module_logger,
unzip_file_from_url,
)
logger = get_module_logger(__name__)
class PersistentPovertyETL(ExtractTransformLoad):
"""Persistent poverty data.
Loaded from `https://s4.ad.brown.edu/Projects/Diversity/Researcher/LTDB.htm`.
Codebook: `https://s4.ad.brown.edu/Projects/Diversity/Researcher/LTBDDload/Dfiles/codebooks.pdf`.
"""
def __init__(self):
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "persistent_poverty"
# Need to change hyperlink to S3
# self.GEOCORR_PLACES_URL = "https://justice40-data.s3.amazonaws.com/data-sources/persistent_poverty_urban_rural.csv.zip"
self.GEOID_TRACT_INPUT_FIELD_NAME_1 = "TRTID10"
self.GEOID_TRACT_INPUT_FIELD_NAME_2 = "tractid"
# self.URBAN_HERUISTIC_FIELD_NAME = "Urban Heuristic Flag"
self.POVERTY_PREFIX = "Individuals in Poverty (percent)"
self.PERSISTENT_POVERTY_FIELD = "Persistent Poverty Census Tract"
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
f"{self.POVERTY_PREFIX} (1990)",
f"{self.POVERTY_PREFIX} (2000)",
f"{self.POVERTY_PREFIX} (2010)",
self.PERSISTENT_POVERTY_FIELD,
]
self.df: pd.DataFrame
def _join_input_dfs(self, dfs: list) -> pd.DataFrame:
df = functools.reduce(
lambda df_a, df_b: pd.merge(
left=df_a,
right=df_b,
# All data frames will now have this field for tract.
on=self.GEOID_TRACT_FIELD_NAME,
how="outer",
),
dfs,
)
# Left-pad the tracts with 0s
expected_length_of_census_tract_field = 11
df[self.GEOID_TRACT_FIELD_NAME] = (
df[self.GEOID_TRACT_FIELD_NAME]
.astype(str)
.apply(lambda x: x.zfill(expected_length_of_census_tract_field))
)
# Sanity check the join.
if len(df[self.GEOID_TRACT_FIELD_NAME].str.len().unique()) != 1:
raise ValueError(
f"One of the input CSVs uses {self.GEOID_TRACT_FIELD_NAME} with a different length."
)
if len(df) > self.EXPECTED_MAX_CENSUS_TRACTS:
raise ValueError(f"Too many rows in the join: {len(df)}")
return df
def extract(self) -> None:
logger.info("Starting to download 86MB persistent poverty file.")
unzipped_file_path = self.TMP_PATH / "persistent_poverty"
unzip_file_from_url(
file_url=settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/LTDB_Std_All_Sample.zip",
download_path=self.TMP_PATH,
unzipped_file_path=unzipped_file_path,
)
file_names = [
"ltdb_std_1990_sample.csv",
"ltdb_std_2000_sample.csv",
"ltdb_std_2010_sample.csv",
]
temporary_input_dfs = []
for file_name in file_names:
print(file_name)
temporary_input_df = pd.read_csv(
filepath_or_buffer=unzipped_file_path
/ f"ltdb_std_all_sample/{file_name}",
dtype={
self.GEOID_TRACT_INPUT_FIELD_NAME_1: "string",
self.GEOID_TRACT_INPUT_FIELD_NAME_2: "string",
},
low_memory=False,
encoding="latin1",
)
# Some CSVs have self.GEOID_TRACT_INPUT_FIELD_NAME_1 as the name of the tract field,
# and some have self.GEOID_TRACT_INPUT_FIELD_NAME_2. Rename them both to the same tract name.
temporary_input_df.rename(
columns={
self.GEOID_TRACT_INPUT_FIELD_NAME_1: self.GEOID_TRACT_FIELD_NAME,
self.GEOID_TRACT_INPUT_FIELD_NAME_2: self.GEOID_TRACT_FIELD_NAME,
},
inplace=True,
# Ignore errors b/c of the different field names in different CSVs.
errors="ignore",
)
temporary_input_dfs.append(temporary_input_df)
self.df = self._join_input_dfs(temporary_input_dfs)
def transform(self) -> None:
logger.info("Starting persistent poverty transform")
transformed_df = self.df
# Note: the fields are defined as following.
# dpovXX Description: persons for whom poverty status is determined
# npovXX Description: persons in poverty
transformed_df[f"{self.POVERTY_PREFIX} (1990)"] = (
transformed_df["NPOV90"] / transformed_df["DPOV90"]
)
transformed_df[f"{self.POVERTY_PREFIX} (2000)"] = (
transformed_df["NPOV00"] / transformed_df["DPOV00"]
)
# Note: for 2010, they use ACS data ending in 2012 that has 2010 as its midpoint year.
transformed_df[f"{self.POVERTY_PREFIX} (2010)"] = (
transformed_df["npov12"] / transformed_df["dpov12"]
)
poverty_threshold = 0.2
transformed_df[self.PERSISTENT_POVERTY_FIELD] = (
(
transformed_df[f"{self.POVERTY_PREFIX} (1990)"]
>= poverty_threshold
)
& (
transformed_df[f"{self.POVERTY_PREFIX} (2000)"]
>= poverty_threshold
)
& (
transformed_df[f"{self.POVERTY_PREFIX} (2010)"]
>= poverty_threshold
)
)
self.df = transformed_df
def load(self) -> None:
logger.info("Saving persistent poverty data.")
# mkdir census
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)
def validate(self) -> None:
logger.info("Validating persistent poverty data.")
pass