From 617f41526f0290ddbe431be1c0f9958916ace02f Mon Sep 17 00:00:00 2001 From: Shelby Switzer Date: Thu, 18 Nov 2021 11:05:32 -0500 Subject: [PATCH] Update Census AMI to ETL into tracts, not CBGs (#900) * Update Census AMI to ETL into tracts, not CBGs Co-authored-by: Shelby Switzer Co-authored-by: lucasmbrown-usds --- .../data_pipeline/etl/score/etl_score.py | 2 +- .../sources/census_acs_median_income/etl.py | 106 +++++++++++++----- 2 files changed, 81 insertions(+), 27 deletions(-) diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index 2639e2dd..57b7bc6d 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -225,7 +225,6 @@ class ScoreETL(ExtractTransformLoad): # Join all the data sources that use census block groups census_block_group_dfs = [ self.ejscreen_df, - self.census_acs_median_incomes_df, ] census_block_group_df = self._join_cbg_dfs(census_block_group_dfs) @@ -241,6 +240,7 @@ class ScoreETL(ExtractTransformLoad): self.persistent_poverty_df, self.housing_and_transportation_df, self.national_risk_index_df, + self.census_acs_median_incomes_df, ] census_tract_df = self._join_tract_dfs(census_tract_dfs) diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py index 06cab81a..45f8e5fa 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py @@ -1,5 +1,6 @@ import json from pathlib import Path +import numpy as np import pandas as pd import requests @@ -29,6 +30,8 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): ) self.MSA_ID_FIELD_NAME: str = "MSA ID" self.MSA_TYPE_FIELD_NAME: str = "MSA Type" + self.POPULATION_FIELD_NAME: str = "pop10" + self.TEMPORARY_SORT_FIELD: str = "temporary sort field" # Set constants for MSA median incomes self.MSA_MEDIAN_INCOME_URL: str = ( @@ -46,7 +49,7 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): self.AMI_REFERENCE_FIELD_NAME: str = "AMI Reference" self.AMI_FIELD_NAME: str = "Area Median Income (State or metropolitan)" self.COLUMNS_TO_KEEP = [ - self.GEOID_FIELD_NAME, + self.GEOID_TRACT_FIELD_NAME, self.PLACE_FIELD_NAME, self.COUNTY_FIELD_NAME, self.STATE_ABBREVIATION_FIELD_NAME, @@ -76,15 +79,19 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): ) # Create the full GEOID out of the component parts. - geocorr_df[self.GEOID_FIELD_NAME] = ( - geocorr_df["county"] + geocorr_df["tract"] + geocorr_df["bg"] + geocorr_df[self.GEOID_TRACT_FIELD_NAME] = ( + geocorr_df["county"] + geocorr_df["tract"] ) # QA the combined field: - tract_values = geocorr_df[self.GEOID_FIELD_NAME].str.len().unique() - if any(tract_values != [12]): + tract_values = ( + geocorr_df[self.GEOID_TRACT_FIELD_NAME].str.len().unique() + ) + if any(tract_values != [11]): print(tract_values) - raise ValueError("Some of the census BG data has the wrong length.") + raise ValueError( + "Some of the census tract data has the wrong length." + ) # Rename some fields geocorr_df.rename( @@ -101,18 +108,55 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): ) # Remove duplicated rows. - # Some rows appear twice: once for the population within a CBG that's also within a census place, - # and once for the population that's within a CBG that's *not* within a census place. - # Drop the row that's not within a census place. + # Some rows appear more than once: once for the population within a tract that's also within a census place, + # and once for the population that's within a tract that's *not* within a census place. + # Sort based on the following rule: + # Assign the place name to the tract that has the highest population of any row with a non-blank place name. + # + # Therefore if there are three place name entries for a tract, the tract + # will be labeled with the place name that has the highest population. + # E.g., for the following (real) data: + # + # | tract | Place Name | Population | + # |-------------|---------------------|------------| + # | 01001020802 | Pine Level CDP, AL | 2642 | + # | 01001020802 | Prattville city, AL | 2347 | + # | 01001020802 | | 5302 | + # |-------------|---------------------|------------| + # + # The largest percent of population in this tract lives in a place that has no name. + # The largest percent of population in a tract with a name is `Pine Level CDP, AL`. + # Therefore the tract should be identified as `Pine Level CDP, AL`. + + # Sort field. This is created purely as a convenience function for sorting purposes. + # This field is as follows: + # | tract | Place Name | Population | Temporary Sort Field | + # |-------------|---------------------|------------|------------| + # | 01001020802 | Pine Level CDP, AL | 2642 | 102642 | + # | 01001020802 | Prattville city, AL | 2347 | 102347 | + # | 01001020802 | | 5302 | 5302 | + # |-------------|---------------------|------------|------------| + # + geocorr_df[self.TEMPORARY_SORT_FIELD] = np.where( + geocorr_df[self.PLACE_FIELD_NAME].str.strip() != "", + # Give place names a major bonus in ranking. + 100000 + geocorr_df[self.POPULATION_FIELD_NAME], + # Otherwise just use population. + geocorr_df[self.POPULATION_FIELD_NAME], + ) # Sort by whether the place has a place name: geocorr_df.sort_values( - by=self.PLACE_FIELD_NAME, axis=0, ascending=True, inplace=True + # Sort by sort field descending, so the highest entry is first. + by=self.TEMPORARY_SORT_FIELD, + axis=0, + ascending=False, + inplace=True, ) # Drop all the duplicated rows except for the first one (which will have the place name): rows_to_drop = geocorr_df.duplicated( - keep="first", subset=[self.GEOID_FIELD_NAME] + keep="first", subset=[self.GEOID_TRACT_FIELD_NAME] ) # Keep everything that's *not* a row to drop: @@ -121,11 +165,14 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): # Sort by GEOID again to put the dataframe back to original order: # Note: avoiding using inplace because of unusual `SettingWithCopyWarning` warning. geocorr_df = geocorr_df.sort_values( - by=self.GEOID_FIELD_NAME, axis=0, ascending=True, inplace=False + by=self.GEOID_TRACT_FIELD_NAME, + axis=0, + ascending=True, + inplace=False, ) - if len(geocorr_df) > self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS: - raise ValueError("Too many CBGs.") + if len(geocorr_df) > self.EXPECTED_MAX_CENSUS_TRACTS: + raise ValueError("Too many tracts.") return geocorr_df @@ -175,12 +222,22 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): # Load and clean GEOCORR data # Note: this data is generated by https://mcdc.missouri.edu/applications/geocorr2014.html, at the advice of the Census. # The specific query used is the following, which takes a couple of minutes to run: - # https://mcdc.missouri.edu/cgi-bin/broker?_PROGRAM=apps.geocorr2014.sas&_SERVICE=MCDC_long&_debug=0&state=Mo29&state=Al01&state=Ak02&state=Az04&state=Ar05&state=Ca06&state=Co08&state=Ct09&state=De10&state=Dc11&state=Fl12&state=Ga13&state=Hi15&state=Id16&state=Il17&state=In18&state=Ia19&state=Ks20&state=Ky21&state=La22&state=Me23&state=Md24&state=Ma25&state=Mi26&state=Mn27&state=Ms28&state=Mt30&state=Ne31&state=Nv32&state=Nh33&state=Nj34&state=Nm35&state=Ny36&state=Nc37&state=Nd38&state=Oh39&state=Ok40&state=Or41&state=Pa42&state=Ri44&state=Sc45&state=Sd46&state=Tn47&state=Tx48&state=Ut49&state=Vt50&state=Va51&state=Wa53&state=Wv54&state=Wi55&state=Wy56&g1_=state&g1_=county&g1_=placefp&g1_=tract&g1_=bg&g2_=cbsa10&g2_=cbsatype10&wtvar=pop10&nozerob=1&title=&csvout=1&namoptf=b&listout=1&lstfmt=html&namoptr=b&oropt=&counties=&metros=&places=&latitude=&longitude=&locname=&distance=&kiloms=0&nrings=&r1=&r2=&r3=&r4=&r5=&r6=&r7=&r8=&r9=&r10=&lathi=&latlo=&longhi=&longlo= - logger.info("Starting download of Geocorr information.") + # https://mcdc.missouri.edu/cgi-bin/broker?_PROGRAM=apps.geocorr2014.sas&_SERVICE=MCDC_long&_debug=0&state=Mo29&state=Al01&state=Ak02&state=Az04&state=Ar05&state=Ca06&state=Co08&state=Ct09&state=De10&state=Dc11&state=Fl12&state=Ga13&state=Hi15&state=Id16&state=Il17&state=In18&state=Ia19&state=Ks20&state=Ky21&state=La22&state=Me23&state=Md24&state=Ma25&state=Mi26&state=Mn27&state=Ms28&state=Mt30&state=Ne31&state=Nv32&state=Nh33&state=Nj34&state=Nm35&state=Ny36&state=Nc37&state=Nd38&state=Oh39&state=Ok40&state=Or41&state=Pa42&state=Ri44&state=Sc45&state=Sd46&state=Tn47&state=Tx48&state=Ut49&state=Vt50&state=Va51&state=Wa53&state=Wv54&state=Wi55&state=Wy56&g1_=state&g1_=county&g1_=placefp&g1_=tract&g2_=cbsa10&g2_=cbsatype10&wtvar=pop10&nozerob=1&title=&csvout=1&namoptf=b&listout=1&lstfmt=html&namoptr=b&oropt=&counties=&metros=&places=&latitude=&longitude=&locname=&distance=&kiloms=0&nrings=&r1=&r2=&r3=&r4=&r5=&r6=&r7=&r8=&r9=&r10=&lathi=&latlo=&longhi=&longlo= + # + # That query was constructed from the website https://mcdc.missouri.edu/applications/geocorr2014.html, + # with the "source geographies" selected being: + # - State + # - County + # - Place (City, Town, Village, CDP, etc) + # - Census Tract + # and with the "target geographies" selected being: + # - Core based statistical area (CBSA) + # - CBSA Type (Metro or Micro) + logger.info("Starting download of 1.5MB Geocorr information.") unzip_file_from_url( file_url=settings.AWS_JUSTICE40_DATASOURCES_URL - + "/geocorr2014_all_states.csv.zip", + + "/geocorr2014_all_states_tracts_only.csv.zip", download_path=self.TMP_PATH, unzipped_file_path=self.TMP_PATH / "geocorr", ) @@ -188,7 +245,7 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): self.raw_geocorr_df = pd.read_csv( filepath_or_buffer=self.TMP_PATH / "geocorr" - / "geocorr2014_all_states.csv", + / "geocorr2014_all_states_tracts_only.csv", # Skip second row, which has descriptions. skiprows=[1], # The following need to remain as strings for all of their digits, not get converted to numbers. @@ -220,14 +277,14 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): msa_median_incomes_df = self._transform_msa_median_incomes() state_median_incomes_df = self._transform_state_median_incomes() - # Join CBGs on MSA incomes + # Join tracts on MSA incomes merged_df = geocorr_df.merge( msa_median_incomes_df, on=self.MSA_ID_FIELD_NAME, how="left" ) - # Merge state income with CBGs + # Merge state income with tracts merged_df[self.STATE_GEOID_FIELD_NAME] = ( - merged_df[self.GEOID_FIELD_NAME].astype(str).str[0:2] + merged_df[self.GEOID_TRACT_FIELD_NAME].astype(str).str[0:2] ) merged_with_state_income_df = merged_df.merge( @@ -236,11 +293,8 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): on=self.STATE_GEOID_FIELD_NAME, ) - if ( - len(merged_with_state_income_df) - > self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS - ): - raise ValueError("Too many CBGs in join.") + if len(merged_with_state_income_df) > self.EXPECTED_MAX_CENSUS_TRACTS: + raise ValueError("Too many tracts in join.") # Choose reference income: MSA if MSA type is Metro, otherwise use State. merged_with_state_income_df[self.AMI_REFERENCE_FIELD_NAME] = [