Ticket 492: Integrate Area Median Income and Poverty measures into ETL (#660)

* Loading AMI and poverty data
This commit is contained in:
Lucas Merrill Brown 2021-09-13 15:36:35 -05:00 committed by GitHub
commit 7d13be7651
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 474 additions and 91 deletions

View file

@ -4,7 +4,6 @@ import censusdata
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings
logger = get_module_logger(__name__)
@ -21,31 +20,38 @@ class CensusACSETL(ExtractTransformLoad):
"Linguistic isolation (total)"
)
self.LINGUISTIC_ISOLATION_FIELDS = [
"C16002_001E", # Estimate!!Total
"C16002_004E", # Estimate!!Total!!Spanish!!Limited English speaking household
"C16002_007E", # Estimate!!Total!!Other Indo-European languages!!Limited English speaking household
"C16002_010E", # Estimate!!Total!!Asian and Pacific Island languages!!Limited English speaking household
"C16002_013E", # Estimate!!Total!!Other languages!!Limited English speaking household
"C16002_001E", # Estimate!!Total
"C16002_004E", # Estimate!!Total!!Spanish!!Limited English speaking household
"C16002_007E", # Estimate!!Total!!Other Indo-European languages!!Limited English speaking household
"C16002_010E", # Estimate!!Total!!Asian and Pacific Island languages!!Limited English speaking household
"C16002_013E", # Estimate!!Total!!Other languages!!Limited English speaking household
]
self.MEDIAN_INCOME_FIELD = "B19013_001E"
self.MEDIAN_INCOME_FIELD_NAME = (
"Median household income in the past 12 months"
)
self.MEDIAN_INCOME_STATE_FIELD_NAME = "Median household income (State)"
self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME = (
"Median household income (% of state median household income)"
self.POVERTY_FIELDS = [
"C17002_001E", # Estimate!!Total,
"C17002_002E", # Estimate!!Total!!Under .50
"C17002_003E", # Estimate!!Total!!.50 to .99
"C17002_004E", # Estimate!!Total!!1.00 to 1.24
"C17002_005E", # Estimate!!Total!!1.25 to 1.49
"C17002_006E", # Estimate!!Total!!1.50 to 1.84
"C17002_007E", # Estimate!!Total!!1.85 to 1.99
]
self.POVERTY_LESS_THAN_100_PERCENT_FPL_FIELD_NAME = (
"Percent of individuals < 100% Federal Poverty Line"
)
self.POVERTY_LESS_THAN_150_PERCENT_FPL_FIELD_NAME = (
"Percent of individuals < 150% Federal Poverty Line"
)
self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME = (
"Percent of individuals < 200% Federal Poverty Line"
)
self.STATE_GEOID_FIELD_NAME = "GEOID2"
self.df: pd.DataFrame
self.state_median_income_df: pd.DataFrame
self.STATE_MEDIAN_INCOME_FTP_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/2015_to_2019_state_median_income.zip"
)
self.STATE_MEDIAN_INCOME_FILE_PATH = (
self.TMP_PATH / "2015_to_2019_state_median_income.csv"
)
def _fips_from_censusdata_censusgeo(
self, censusgeo: censusdata.censusgeo
@ -55,11 +61,6 @@ class CensusACSETL(ExtractTransformLoad):
return fips
def extract(self) -> None:
# Extract state median income
super().extract(
self.STATE_MEDIAN_INCOME_FTP_URL,
self.TMP_PATH,
)
dfs = []
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(
@ -79,7 +80,8 @@ class CensusACSETL(ExtractTransformLoad):
"B23025_003E",
self.MEDIAN_INCOME_FIELD,
]
+ self.LINGUISTIC_ISOLATION_FIELDS,
+ self.LINGUISTIC_ISOLATION_FIELDS
+ self.POVERTY_FIELDS,
)
)
@ -89,12 +91,6 @@ class CensusACSETL(ExtractTransformLoad):
func=self._fips_from_censusdata_censusgeo
)
self.state_median_income_df = pd.read_csv(
# TODO: Replace with reading from S3.
filepath_or_buffer=self.STATE_MEDIAN_INCOME_FILE_PATH,
dtype={self.STATE_GEOID_FIELD_NAME: "string"},
)
def transform(self) -> None:
logger.info("Starting Census ACS Transform")
@ -103,24 +99,6 @@ class CensusACSETL(ExtractTransformLoad):
self.MEDIAN_INCOME_FIELD
]
# TODO: handle null values for CBG median income, which are `-666666666`.
# Join state data on CBG data:
self.df[self.STATE_GEOID_FIELD_NAME] = (
self.df[self.GEOID_FIELD_NAME].astype(str).str[0:2]
)
self.df = self.df.merge(
self.state_median_income_df,
how="left",
on=self.STATE_GEOID_FIELD_NAME,
)
# Calculate the income of the block group as a fraction of the state income:
self.df[self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME] = (
self.df[self.MEDIAN_INCOME_FIELD_NAME]
/ self.df[self.MEDIAN_INCOME_STATE_FIELD_NAME]
)
# Calculate percent unemployment.
# TODO: remove small-sample data that should be `None` instead of a high-variance fraction.
self.df[self.UNEMPLOYED_FIELD_NAME] = (
@ -145,6 +123,27 @@ class CensusACSETL(ExtractTransformLoad):
self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME].describe()
# Calculate percent at different poverty thresholds
self.df[self.POVERTY_LESS_THAN_100_PERCENT_FPL_FIELD_NAME] = (
self.df["C17002_002E"] + self.df["C17002_003E"]
) / self.df["C17002_001E"]
self.df[self.POVERTY_LESS_THAN_150_PERCENT_FPL_FIELD_NAME] = (
self.df["C17002_002E"]
+ self.df["C17002_003E"]
+ self.df["C17002_004E"]
+ self.df["C17002_005E"]
) / self.df["C17002_001E"]
self.df[self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME] = (
self.df["C17002_002E"]
+ self.df["C17002_003E"]
+ self.df["C17002_004E"]
+ self.df["C17002_005E"]
+ self.df["C17002_006E"]
+ self.df["C17002_007E"]
) / self.df["C17002_001E"]
def load(self) -> None:
logger.info("Saving Census ACS Data")
@ -156,8 +155,9 @@ class CensusACSETL(ExtractTransformLoad):
self.UNEMPLOYED_FIELD_NAME,
self.LINGUISTIC_ISOLATION_FIELD_NAME,
self.MEDIAN_INCOME_FIELD_NAME,
self.MEDIAN_INCOME_STATE_FIELD_NAME,
self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME,
self.POVERTY_LESS_THAN_100_PERCENT_FPL_FIELD_NAME,
self.POVERTY_LESS_THAN_150_PERCENT_FPL_FIELD_NAME,
self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME,
]
self.df[columns_to_include].to_csv(

View file

@ -0,0 +1,276 @@
import json
from pathlib import Path
import pandas as pd
import requests
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings
from data_pipeline.utils import unzip_file_from_url
logger = get_module_logger(__name__)
class CensusACSMedianIncomeETL(ExtractTransformLoad):
def __init__(self):
self.ACS_YEAR: int = 2019
self.OUTPUT_PATH: Path = (
self.DATA_PATH
/ "dataset"
/ f"census_acs_median_income_{self.ACS_YEAR}"
)
# Set constants for Geocorr MSAs data.
self.PLACE_FIELD_NAME: str = "Census Place Name"
self.COUNTY_FIELD_NAME: str = "County Name"
self.STATE_ABBREVIATION_FIELD_NAME: str = "State Abbreviation"
self.MSA_FIELD_NAME: str = (
"Metropolitan/Micropolitan Statistical Area Name"
)
self.MSA_ID_FIELD_NAME: str = "MSA ID"
self.MSA_TYPE_FIELD_NAME: str = "MSA Type"
# Set constants for MSA median incomes
self.MSA_MEDIAN_INCOME_URL: str = (
f"https://api.census.gov/data/{self.ACS_YEAR}/acs/acs5?get=B19013_001E"
+ "&for=metropolitan%20statistical%20area/micropolitan%20statistical%20area"
)
self.MSA_INCOME_FIELD_NAME: str = f"Median household income in the past 12 months (MSA; {self.ACS_YEAR} inflation-adjusted dollars)"
# Set constants for state median incomes
self.STATE_MEDIAN_INCOME_URL: str = f"https://api.census.gov/data/{self.ACS_YEAR}/acs/acs5?get=B19013_001E&for=state"
self.STATE_GEOID_FIELD_NAME: str = "GEOID2"
self.STATE_MEDIAN_INCOME_FIELD_NAME: str = f"Median household income (State; {self.ACS_YEAR} inflation-adjusted dollars)"
# Constants for output
self.AMI_REFERENCE_FIELD_NAME: str = "AMI Reference"
self.AMI_FIELD_NAME: str = "Area Median Income (State or metropolitan)"
self.COLUMNS_TO_KEEP = [
self.GEOID_FIELD_NAME,
self.PLACE_FIELD_NAME,
self.COUNTY_FIELD_NAME,
self.STATE_ABBREVIATION_FIELD_NAME,
self.MSA_FIELD_NAME,
self.MSA_ID_FIELD_NAME,
self.MSA_TYPE_FIELD_NAME,
self.MSA_INCOME_FIELD_NAME,
self.STATE_GEOID_FIELD_NAME,
self.STATE_MEDIAN_INCOME_FIELD_NAME,
self.AMI_REFERENCE_FIELD_NAME,
self.AMI_FIELD_NAME,
]
# Remaining definitions
self.output_df: pd.DataFrame
self.raw_geocorr_df: pd.DataFrame
self.msa_median_incomes: dict
self.state_median_incomes: dict
def _transform_geocorr(self) -> pd.DataFrame:
# Transform the geocorr data
geocorr_df = self.raw_geocorr_df
# Strip the unnecessary period from the tract ID:
geocorr_df["tract"] = geocorr_df["tract"].str.replace(
".", "", regex=False
)
# Create the full GEOID out of the component parts.
geocorr_df[self.GEOID_FIELD_NAME] = (
geocorr_df["county"] + geocorr_df["tract"] + geocorr_df["bg"]
)
# QA the combined field:
tract_values = geocorr_df[self.GEOID_FIELD_NAME].str.len().unique()
if any(tract_values != [12]):
print(tract_values)
raise ValueError("Some of the census BG data has the wrong length.")
# Rename some fields
geocorr_df.rename(
columns={
"placenm": self.PLACE_FIELD_NAME,
"cbsaname10": self.MSA_FIELD_NAME,
"cntyname": self.COUNTY_FIELD_NAME,
"stab": self.STATE_ABBREVIATION_FIELD_NAME,
"cbsa10": self.MSA_ID_FIELD_NAME,
"cbsatype10": self.MSA_TYPE_FIELD_NAME,
},
inplace=True,
errors="raise",
)
# Remove duplicated rows.
# Some rows appear twice: once for the population within a CBG that's also within a census place,
# and once for the population that's within a CBG that's *not* within a census place.
# Drop the row that's not within a census place.
# Sort by whether the place has a place name:
geocorr_df.sort_values(
by=self.PLACE_FIELD_NAME, axis=0, ascending=True, inplace=True
)
# Drop all the duplicated rows except for the first one (which will have the place name):
rows_to_drop = geocorr_df.duplicated(
keep="first", subset=[self.GEOID_FIELD_NAME]
)
# Keep everything that's *not* a row to drop:
geocorr_df = geocorr_df[~rows_to_drop]
# Sort by GEOID again to put the dataframe back to original order:
# Note: avoiding using inplace because of unusual `SettingWithCopyWarning` warning.
geocorr_df = geocorr_df.sort_values(
by=self.GEOID_FIELD_NAME, axis=0, ascending=True, inplace=False
)
if len(geocorr_df) > self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS:
raise ValueError("Too many CBGs.")
return geocorr_df
def _transform_msa_median_incomes(self) -> pd.DataFrame:
# Remove first list entry, which is the column names.
column_names = self.msa_median_incomes.pop(0)
msa_median_incomes_df = pd.DataFrame(
data=self.msa_median_incomes, columns=column_names
)
msa_median_incomes_df.rename(
columns={
"B19013_001E": self.MSA_INCOME_FIELD_NAME,
"metropolitan statistical area/micropolitan statistical area": self.MSA_ID_FIELD_NAME,
},
inplace=True,
errors="raise",
)
# Convert MSA ID to str
msa_median_incomes_df[self.MSA_ID_FIELD_NAME] = msa_median_incomes_df[
self.MSA_ID_FIELD_NAME
].astype(str)
return msa_median_incomes_df
def _transform_state_median_incomes(self) -> pd.DataFrame:
# Remove first list entry, which is the column names.
column_names = self.state_median_incomes.pop(0)
state_median_incomes_df = pd.DataFrame(
data=self.state_median_incomes, columns=column_names
)
state_median_incomes_df.rename(
columns={
"B19013_001E": self.STATE_MEDIAN_INCOME_FIELD_NAME,
"state": self.STATE_GEOID_FIELD_NAME,
},
inplace=True,
errors="raise",
)
return state_median_incomes_df
def extract(self) -> None:
logger.info("Starting three separate downloads.")
# Load and clean GEOCORR data
# Note: this data is generated by https://mcdc.missouri.edu/applications/geocorr2014.html, at the advice of the Census.
# The specific query used is the following, which takes a couple of minutes to run:
# https://mcdc.missouri.edu/cgi-bin/broker?_PROGRAM=apps.geocorr2014.sas&_SERVICE=MCDC_long&_debug=0&state=Mo29&state=Al01&state=Ak02&state=Az04&state=Ar05&state=Ca06&state=Co08&state=Ct09&state=De10&state=Dc11&state=Fl12&state=Ga13&state=Hi15&state=Id16&state=Il17&state=In18&state=Ia19&state=Ks20&state=Ky21&state=La22&state=Me23&state=Md24&state=Ma25&state=Mi26&state=Mn27&state=Ms28&state=Mt30&state=Ne31&state=Nv32&state=Nh33&state=Nj34&state=Nm35&state=Ny36&state=Nc37&state=Nd38&state=Oh39&state=Ok40&state=Or41&state=Pa42&state=Ri44&state=Sc45&state=Sd46&state=Tn47&state=Tx48&state=Ut49&state=Vt50&state=Va51&state=Wa53&state=Wv54&state=Wi55&state=Wy56&g1_=state&g1_=county&g1_=placefp&g1_=tract&g1_=bg&g2_=cbsa10&g2_=cbsatype10&wtvar=pop10&nozerob=1&title=&csvout=1&namoptf=b&listout=1&lstfmt=html&namoptr=b&oropt=&counties=&metros=&places=&latitude=&longitude=&locname=&distance=&kiloms=0&nrings=&r1=&r2=&r3=&r4=&r5=&r6=&r7=&r8=&r9=&r10=&lathi=&latlo=&longhi=&longlo=
logger.info("Starting download of Geocorr information.")
unzip_file_from_url(
file_url=settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/geocorr2014_all_states.csv.zip",
download_path=self.TMP_PATH,
unzipped_file_path=self.TMP_PATH / "geocorr",
)
self.raw_geocorr_df = pd.read_csv(
filepath_or_buffer=self.TMP_PATH
/ "geocorr"
/ "geocorr2014_all_states.csv",
# Skip second row, which has descriptions.
skiprows=[1],
# The following need to remain as strings for all of their digits, not get converted to numbers.
dtype={
"tract": "string",
"county": "string",
"state": "string",
"bg": "string",
"cbsa10": "string",
},
low_memory=False,
)
# Download MSA median incomes
logger.info("Starting download of MSA median incomes.")
download = requests.get(self.MSA_MEDIAN_INCOME_URL, verify=None)
self.msa_median_incomes = json.loads(download.content)
# Download state median incomes
logger.info("Starting download of state median incomes.")
download_state = requests.get(self.STATE_MEDIAN_INCOME_URL, verify=None)
self.state_median_incomes = json.loads(download_state.content)
def transform(self) -> None:
logger.info("Starting transforms.")
# Run transforms:
geocorr_df = self._transform_geocorr()
msa_median_incomes_df = self._transform_msa_median_incomes()
state_median_incomes_df = self._transform_state_median_incomes()
# Join CBGs on MSA incomes
merged_df = geocorr_df.merge(
msa_median_incomes_df, on=self.MSA_ID_FIELD_NAME, how="left"
)
# Merge state income with CBGs
merged_df[self.STATE_GEOID_FIELD_NAME] = (
merged_df[self.GEOID_FIELD_NAME].astype(str).str[0:2]
)
merged_with_state_income_df = merged_df.merge(
state_median_incomes_df,
how="left",
on=self.STATE_GEOID_FIELD_NAME,
)
if (
len(merged_with_state_income_df)
> self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS
):
raise ValueError("Too many CBGs in join.")
# Choose reference income: MSA if MSA type is Metro, otherwise use State.
merged_with_state_income_df[self.AMI_REFERENCE_FIELD_NAME] = [
"MSA" if msa_type == "Metro" else "State"
for msa_type in merged_with_state_income_df[
self.MSA_TYPE_FIELD_NAME
]
]
# Populate reference income: MSA income if reference income is MSA, state income if reference income is state.
merged_with_state_income_df[
self.AMI_FIELD_NAME
] = merged_with_state_income_df.apply(
lambda x: x[self.MSA_INCOME_FIELD_NAME]
if x[self.AMI_REFERENCE_FIELD_NAME] == "MSA"
else x[self.STATE_MEDIAN_INCOME_FIELD_NAME],
axis=1,
)
self.output_df = merged_with_state_income_df
def validate(self) -> None:
logger.info("Validating Census ACS Median Income Data")
pass
def load(self) -> None:
logger.info("Saving Census ACS Median Income CSV")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)

View file

@ -272,7 +272,6 @@ class HudHousingETL(ExtractTransformLoad):
- self.df[RENTER_OCCUPIED_NOT_COMPUTED_FIELDS].sum(axis=1)
)
self.df["DENOM INCL NOT COMPUTED"] = (
self.df[OWNER_OCCUPIED_POPULATION_FIELD]
+ self.df[RENTER_OCCUPIED_POPULATION_FIELD]