From f9be97d8c8b5412121f33741eada6f861f50b3d4 Mon Sep 17 00:00:00 2001 From: Emma Nechamkin <97977170+emma-nechamkin@users.noreply.github.com> Date: Tue, 1 Mar 2022 16:31:38 -0500 Subject: [PATCH] This is a quick addition to include PR AMI. To be revised in the "clean up code" pr --- .../sources/census_acs_median_income/etl.py | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py index 1e354c5c..1e992cf4 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py @@ -7,7 +7,7 @@ import requests from data_pipeline.etl.base import ExtractTransformLoad from data_pipeline.utils import get_module_logger from data_pipeline.config import settings -from data_pipeline.utils import unzip_file_from_url +from data_pipeline.utils import unzip_file_from_url, download_file_from_url logger = get_module_logger(__name__) @@ -45,6 +45,11 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): self.STATE_GEOID_FIELD_NAME: str = "GEOID2" self.STATE_MEDIAN_INCOME_FIELD_NAME: str = f"Median household income (State; {self.ACS_YEAR} inflation-adjusted dollars)" + # List of Puerto Rico tracts + self.PUERTO_RICO_S3_LINK: str = ( + settings.AWS_JUSTICE40_DATASOURCES_URL + "/PR_census_tracts.csv" + ) + # Constants for output self.AMI_REFERENCE_FIELD_NAME: str = "AMI Reference" self.AMI_FIELD_NAME: str = "Area Median Income (State or metropolitan)" @@ -68,6 +73,7 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): self.raw_geocorr_df: pd.DataFrame self.msa_median_incomes: dict self.state_median_incomes: dict + self.pr_tracts: pd.DataFrame def _transform_geocorr(self) -> pd.DataFrame: # Transform the geocorr data @@ -214,11 +220,10 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): inplace=True, errors="raise", ) - return state_median_incomes_df def extract(self) -> None: - logger.info("Starting three separate downloads.") + logger.info("Starting four separate downloads.") # Load and clean GEOCORR data # Note: this data is generated by https://mcdc.missouri.edu/applications/geocorr2014.html, at the advice of the Census. # The specific query used is the following, which takes a couple of minutes to run: @@ -259,6 +264,21 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): low_memory=False, ) + logger.info("Pulling PR info down.") + pr_file = self.get_tmp_path() / "pr_tracts" / "pr_tracts.csv" + download_file_from_url( + file_url=self.PUERTO_RICO_S3_LINK, download_file_name=pr_file + ) + self.pr_tracts = pd.read_csv( + filepath_or_buffer=self.get_tmp_path() + / "pr_tracts" + / "pr_tracts.csv", + # Skip second row, which has descriptions. + # The following need to remain as strings for all of their digits, not get converted to numbers. + dtype={"GEOID10_TRACT": str}, + low_memory=False, + ) + # Download MSA median incomes logger.info("Starting download of MSA median incomes.") download = requests.get(self.MSA_MEDIAN_INCOME_URL, verify=None) @@ -268,6 +288,7 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): logger.info("Starting download of state median incomes.") download_state = requests.get(self.STATE_MEDIAN_INCOME_URL, verify=None) self.state_median_incomes = json.loads(download_state.content) + ## NOTE we already have PR's MI here def transform(self) -> None: logger.info("Starting transforms.") @@ -277,8 +298,13 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): msa_median_incomes_df = self._transform_msa_median_incomes() state_median_incomes_df = self._transform_state_median_incomes() - # Join tracts on MSA incomes - merged_df = geocorr_df.merge( + # Adds 945 PR tracts + geocorr_df_plus_pr = geocorr_df.merge( + self.pr_tracts, how="outer", indicator=True + ) + + # Join tracts on MSA incomes (this is where we lose PR) + merged_df = geocorr_df_plus_pr.merge( msa_median_incomes_df, on=self.MSA_ID_FIELD_NAME, how="left" ) @@ -287,9 +313,10 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): merged_df[self.GEOID_TRACT_FIELD_NAME].astype(str).str[0:2] ) + # outer join adds PR back merged_with_state_income_df = merged_df.merge( state_median_incomes_df, - how="left", + how="outer", on=self.STATE_GEOID_FIELD_NAME, )