This is a quick addition to include PR AMI. To be revised in the "clean up code" pr

This commit is contained in:
Emma Nechamkin 2022-03-01 16:31:38 -05:00 committed by GitHub
parent 06ba506bc6
commit f9be97d8c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -7,7 +7,7 @@ import requests
from data_pipeline.etl.base import ExtractTransformLoad from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings from data_pipeline.config import settings
from data_pipeline.utils import unzip_file_from_url from data_pipeline.utils import unzip_file_from_url, download_file_from_url
logger = get_module_logger(__name__) logger = get_module_logger(__name__)
@ -45,6 +45,11 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad):
self.STATE_GEOID_FIELD_NAME: str = "GEOID2" self.STATE_GEOID_FIELD_NAME: str = "GEOID2"
self.STATE_MEDIAN_INCOME_FIELD_NAME: str = f"Median household income (State; {self.ACS_YEAR} inflation-adjusted dollars)" self.STATE_MEDIAN_INCOME_FIELD_NAME: str = f"Median household income (State; {self.ACS_YEAR} inflation-adjusted dollars)"
# List of Puerto Rico tracts
self.PUERTO_RICO_S3_LINK: str = (
settings.AWS_JUSTICE40_DATASOURCES_URL + "/PR_census_tracts.csv"
)
# Constants for output # Constants for output
self.AMI_REFERENCE_FIELD_NAME: str = "AMI Reference" self.AMI_REFERENCE_FIELD_NAME: str = "AMI Reference"
self.AMI_FIELD_NAME: str = "Area Median Income (State or metropolitan)" self.AMI_FIELD_NAME: str = "Area Median Income (State or metropolitan)"
@ -68,6 +73,7 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad):
self.raw_geocorr_df: pd.DataFrame self.raw_geocorr_df: pd.DataFrame
self.msa_median_incomes: dict self.msa_median_incomes: dict
self.state_median_incomes: dict self.state_median_incomes: dict
self.pr_tracts: pd.DataFrame
def _transform_geocorr(self) -> pd.DataFrame: def _transform_geocorr(self) -> pd.DataFrame:
# Transform the geocorr data # Transform the geocorr data
@ -214,11 +220,10 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad):
inplace=True, inplace=True,
errors="raise", errors="raise",
) )
return state_median_incomes_df return state_median_incomes_df
def extract(self) -> None: def extract(self) -> None:
logger.info("Starting three separate downloads.") logger.info("Starting four separate downloads.")
# Load and clean GEOCORR data # Load and clean GEOCORR data
# Note: this data is generated by https://mcdc.missouri.edu/applications/geocorr2014.html, at the advice of the Census. # Note: this data is generated by https://mcdc.missouri.edu/applications/geocorr2014.html, at the advice of the Census.
# The specific query used is the following, which takes a couple of minutes to run: # The specific query used is the following, which takes a couple of minutes to run:
@ -259,6 +264,21 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad):
low_memory=False, low_memory=False,
) )
logger.info("Pulling PR info down.")
pr_file = self.get_tmp_path() / "pr_tracts" / "pr_tracts.csv"
download_file_from_url(
file_url=self.PUERTO_RICO_S3_LINK, download_file_name=pr_file
)
self.pr_tracts = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "pr_tracts"
/ "pr_tracts.csv",
# Skip second row, which has descriptions.
# The following need to remain as strings for all of their digits, not get converted to numbers.
dtype={"GEOID10_TRACT": str},
low_memory=False,
)
# Download MSA median incomes # Download MSA median incomes
logger.info("Starting download of MSA median incomes.") logger.info("Starting download of MSA median incomes.")
download = requests.get(self.MSA_MEDIAN_INCOME_URL, verify=None) download = requests.get(self.MSA_MEDIAN_INCOME_URL, verify=None)
@ -268,6 +288,7 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad):
logger.info("Starting download of state median incomes.") logger.info("Starting download of state median incomes.")
download_state = requests.get(self.STATE_MEDIAN_INCOME_URL, verify=None) download_state = requests.get(self.STATE_MEDIAN_INCOME_URL, verify=None)
self.state_median_incomes = json.loads(download_state.content) self.state_median_incomes = json.loads(download_state.content)
## NOTE we already have PR's MI here
def transform(self) -> None: def transform(self) -> None:
logger.info("Starting transforms.") logger.info("Starting transforms.")
@ -277,8 +298,13 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad):
msa_median_incomes_df = self._transform_msa_median_incomes() msa_median_incomes_df = self._transform_msa_median_incomes()
state_median_incomes_df = self._transform_state_median_incomes() state_median_incomes_df = self._transform_state_median_incomes()
# Join tracts on MSA incomes # Adds 945 PR tracts
merged_df = geocorr_df.merge( geocorr_df_plus_pr = geocorr_df.merge(
self.pr_tracts, how="outer", indicator=True
)
# Join tracts on MSA incomes (this is where we lose PR)
merged_df = geocorr_df_plus_pr.merge(
msa_median_incomes_df, on=self.MSA_ID_FIELD_NAME, how="left" msa_median_incomes_df, on=self.MSA_ID_FIELD_NAME, how="left"
) )
@ -287,9 +313,10 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad):
merged_df[self.GEOID_TRACT_FIELD_NAME].astype(str).str[0:2] merged_df[self.GEOID_TRACT_FIELD_NAME].astype(str).str[0:2]
) )
# outer join adds PR back
merged_with_state_income_df = merged_df.merge( merged_with_state_income_df = merged_df.merge(
state_median_incomes_df, state_median_incomes_df,
how="left", how="outer",
on=self.STATE_GEOID_FIELD_NAME, on=self.STATE_GEOID_FIELD_NAME,
) )