diff --git a/data/data-pipeline/data_pipeline/etl/base.py b/data/data-pipeline/data_pipeline/etl/base.py index 99ac679a..a1496909 100644 --- a/data/data-pipeline/data_pipeline/etl/base.py +++ b/data/data-pipeline/data_pipeline/etl/base.py @@ -11,6 +11,7 @@ from data_pipeline.etl.score.etl_utils import ( compare_to_list_of_expected_state_fips_codes, ) from data_pipeline.etl.score.schemas.datasets import DatasetsConfig +from data_pipeline.score import field_names from data_pipeline.utils import ( load_yaml_dict_from_file, unzip_file_from_url, @@ -387,6 +388,7 @@ class ExtractTransformLoad: # Tract ID, but these will be ignored if they're not present. cls.GEOID_FIELD_NAME: "string", cls.GEOID_TRACT_FIELD_NAME: "string", + field_names.ZIP_CODE: "string", }, ) diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py b/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py index bf6af0f1..35ddfa98 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py @@ -542,7 +542,6 @@ class PostScoreETL(ExtractTransformLoad): # load codebook to disk codebook_df.to_csv(codebook_path, index=False) - # TODO: Write zip-code based files logger.info("Compressing files") diff --git a/data/data-pipeline/data_pipeline/etl/sources/geocorr_alternatives/etl.py b/data/data-pipeline/data_pipeline/etl/sources/geocorr_alternatives/etl.py index 30e5b6eb..bdf4560e 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/geocorr_alternatives/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/geocorr_alternatives/etl.py @@ -1,15 +1,8 @@ -import typing -from typing import List - import geopandas as gpd -import numpy as np import pandas as pd - -from data_pipeline.config import settings from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel from data_pipeline.etl.sources.geo_utils import ( add_tracts_for_geometries, - get_tribal_geojson, get_tract_geojson, ) from data_pipeline.score import field_names @@ -21,86 +14,91 @@ logger = get_module_logger(__name__) class GeoCorrAlternativesETL(ExtractTransformLoad): """Calculates overlap between Census tracts & various alternative geographies.""" - # Metadata for the baseclass NAME = "geocorr_alternatives" GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT - PUERTO_RICO_EXPECTED_IN_DATA = False - - INPUT_GEOCORR_TRACT_FIELD = "tract" - INPUT_GEOCORR_COUNTY_FIELD = "county" - INPUT_GEOCORR_ZIP_FIELD = "zcta5" - INPUT_GEOCORR_ALLOCATION_FIELD = "afact" + ISLAND_AREAS_EXPECTED_IN_DATA = True # Skip some validation checks, because there will be multiple rows per tract in this # geocorr dataset. VALIDATE_SHOULD_SKIP_DUPLICATE_GEOGRAPHIES_AND_GEOGRAPHY_COUNT = True - # GeoCorr downloads have a field definition in the second row of the CSV. - # This parameter skips the second row for pandas `read_csv`. - GEOCORR_SKIP_ROWS: typing.List[int] = [1] + CRS_INTEGER = 3857 - # GeoCorr 2010 Zip Codes (Zip Code Tabulation Area 5, ZCTA5) -> 2010 Tracts - # This file was generated in the UI at https://mcdc.missouri.edu/. - # The network request to generate it is: - # https://mcdc.missouri.edu/cgi-bin/broker?_PROGRAM=apps.geocorr2018.sas&_SERVICE=MCDC_long&_debug=0&state=Mo29&state=Al01&state=Ak02&state=Az04&state=Ar05&state=Ca06&state=Co08&state=Ct09&state=De10&state=Dc11&state=Fl12&state=Ga13&state=Hi15&state=Id16&state=Il17&state=In18&state=Ia19&state=Ks20&state=Ky21&state=La22&state=Me23&state=Md24&state=Ma25&state=Mi26&state=Mn27&state=Ms28&state=Mt30&state=Ne31&state=Nv32&state=Nh33&state=Nj34&state=Nm35&state=Ny36&state=Nc37&state=Nd38&state=Oh39&state=Ok40&state=Or41&state=Pa42&state=Ri44&state=Sc45&state=Sd46&state=Tn47&state=Tx48&state=Ut49&state=Vt50&state=Va51&state=Wa53&state=Wv54&state=Wi55&state=Wy56&g1_=zcta5&g2_=tract&wtvar=pop10&nozerob=1&title=&csvout=1&namoptf=b&listout=1&lstfmt=html&namoptr=b&oropt=&counties=&metros=&places=&latitude=&longitude=&locname=&distance=&kiloms=0&nrings=&r1=&r2=&r3=&r4=&r5=&r6=&r7=&r8=&r9=&r10=&lathi=&latlo=&longhi=&longlo= - ZIP_CODES_TO_TRACTS_PATH = ( - settings.AWS_JUSTICE40_DATASOURCES_URL - + "/geocorr2018_zcta5_to_tracts.csv.zip" + ZCTA_2020_SHAPEFILE_PATH = ( + "https://www2.census.gov/geo/tiger/GENZ2020/shp" + + "/cb_2020_us_zcta520_500k.zip" ) - COLUMNS_TO_KEEP = [ - ExtractTransformLoad.GEOID_TRACT_FIELD_NAME, - field_names.ZIP_CODE, - field_names.PERCENT_OF_ZIP_CODE_IN_TRACT, - ] + ZIP_CODE_INPUT_FIELD = "ZCTA5CE20" + AREA_JOINED_FIELD = "area_joined" + AREA_ZIP_FIELD = "area_zip" - # Define these for easy code completion def __init__(self): - self.ZIP_CODES_TO_TRACTS_CSV = ( - self.get_tmp_path() / "geocorr2018_zcta5_to_tracts.csv" - ) + self.COLUMNS_TO_KEEP = [ + self.GEOID_TRACT_FIELD_NAME, + field_names.ZIP_CODE, + field_names.PERCENT_OF_ZIP_CODE_IN_TRACT, + self.AREA_JOINED_FIELD, + self.AREA_ZIP_FIELD, + ] self.output_df: pd.DataFrame + self.census_tract_gdf: gpd.GeoDataFrame def extract(self) -> None: - # Load the tracts to zip codes data. + # Download 2020 zip boundaries. unzip_file_from_url( - file_url=self.ZIP_CODES_TO_TRACTS_PATH, + file_url=self.ZCTA_2020_SHAPEFILE_PATH, download_path=self.get_tmp_path(), - unzipped_file_path=self.get_tmp_path(), + unzipped_file_path=self.get_tmp_path() / "cb_2020_us_zcta520_500k", verify=True, ) + # Load census + self.census_tract_gdf = get_tract_geojson() + def transform(self) -> None: logger.info("Starting GeoCorr alternatives transforms.") - zip_codes_to_tracts_df = pd.read_csv( - filepath_or_buffer=self.ZIP_CODES_TO_TRACTS_CSV, - dtype={ - self.INPUT_GEOCORR_TRACT_FIELD: "string", - self.INPUT_GEOCORR_COUNTY_FIELD: "string", - self.INPUT_GEOCORR_ZIP_FIELD: "string", - }, - skiprows=self.GEOCORR_SKIP_ROWS, - ) - zip_codes_to_tracts_df = zip_codes_to_tracts_df.rename( - columns={ - self.INPUT_GEOCORR_ALLOCATION_FIELD: field_names.PERCENT_OF_ZIP_CODE_IN_TRACT, - self.INPUT_GEOCORR_ZIP_FIELD: field_names.ZIP_CODE, - }, + # Read in ZCTA data. + zcta_2020_gdf = gpd.read_file( + filename=self.get_tmp_path() / "cb_2020_us_zcta520_500k" + ) + zcta_2020_gdf = zcta_2020_gdf.rename( + columns={self.ZIP_CODE_INPUT_FIELD: field_names.ZIP_CODE}, errors="raise", ) - # Create the tract ID by combining fields from GeoCorr. - zip_codes_to_tracts_df[self.GEOID_TRACT_FIELD_NAME] = ( - zip_codes_to_tracts_df[self.INPUT_GEOCORR_COUNTY_FIELD] - + zip_codes_to_tracts_df[self.INPUT_GEOCORR_TRACT_FIELD] + # Switch from geographic to projected CRSes + # because logically that's right + self.census_tract_gdf = self.census_tract_gdf.to_crs( + crs=self.CRS_INTEGER ) - # Remove unnecessary periods. - zip_codes_to_tracts_df[ - self.GEOID_TRACT_FIELD_NAME - ] = zip_codes_to_tracts_df[self.GEOID_TRACT_FIELD_NAME].str.replace( - ".", "", regex=False + zcta_2020_gdf = zcta_2020_gdf.to_crs(crs=self.CRS_INTEGER) + + # Calculate percentage overlap. + # Create a measure of area for the entire Zip area. + zcta_2020_gdf[self.AREA_ZIP_FIELD] = zcta_2020_gdf.area + + # Perform overlay function. + # We have a mix of polygons and multipolygons, and we just want the overlaps + # without caring a ton about the specific types, so we ignore geom type. + # Realistically, this changes almost nothing in the calculation; True and False + # are the same within 9 digits of precision. + joined_gdf = gpd.overlay( + df1=zcta_2020_gdf, + df2=self.census_tract_gdf, + how="intersection", + keep_geom_type=False, ) - self.output_df = zip_codes_to_tracts_df + # Calculating the areas of the newly-created overlapping geometries + joined_gdf[self.AREA_JOINED_FIELD] = joined_gdf.area + + # Calculating the areas of the newly-created geometries in relation + # to the original tract geometries + joined_gdf[field_names.PERCENT_OF_ZIP_CODE_IN_TRACT] = ( + joined_gdf[self.AREA_JOINED_FIELD] / joined_gdf[self.AREA_ZIP_FIELD] + ) + + self.output_df = joined_gdf