updating to directly calculate overlay

This commit is contained in:
lucasmbrown-usds 2022-09-28 16:10:15 -04:00
parent 9f0918d2a9
commit ed364fbb26
3 changed files with 60 additions and 61 deletions

View file

@ -11,6 +11,7 @@ from data_pipeline.etl.score.etl_utils import (
compare_to_list_of_expected_state_fips_codes, compare_to_list_of_expected_state_fips_codes,
) )
from data_pipeline.etl.score.schemas.datasets import DatasetsConfig from data_pipeline.etl.score.schemas.datasets import DatasetsConfig
from data_pipeline.score import field_names
from data_pipeline.utils import ( from data_pipeline.utils import (
load_yaml_dict_from_file, load_yaml_dict_from_file,
unzip_file_from_url, unzip_file_from_url,
@ -387,6 +388,7 @@ class ExtractTransformLoad:
# Tract ID, but these will be ignored if they're not present. # Tract ID, but these will be ignored if they're not present.
cls.GEOID_FIELD_NAME: "string", cls.GEOID_FIELD_NAME: "string",
cls.GEOID_TRACT_FIELD_NAME: "string", cls.GEOID_TRACT_FIELD_NAME: "string",
field_names.ZIP_CODE: "string",
}, },
) )

View file

@ -542,7 +542,6 @@ class PostScoreETL(ExtractTransformLoad):
# load codebook to disk # load codebook to disk
codebook_df.to_csv(codebook_path, index=False) codebook_df.to_csv(codebook_path, index=False)
# TODO: Write zip-code based files # TODO: Write zip-code based files
logger.info("Compressing files") logger.info("Compressing files")

View file

@ -1,15 +1,8 @@
import typing
from typing import List
import geopandas as gpd import geopandas as gpd
import numpy as np
import pandas as pd import pandas as pd
from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.etl.sources.geo_utils import ( from data_pipeline.etl.sources.geo_utils import (
add_tracts_for_geometries, add_tracts_for_geometries,
get_tribal_geojson,
get_tract_geojson, get_tract_geojson,
) )
from data_pipeline.score import field_names from data_pipeline.score import field_names
@ -21,86 +14,91 @@ logger = get_module_logger(__name__)
class GeoCorrAlternativesETL(ExtractTransformLoad): class GeoCorrAlternativesETL(ExtractTransformLoad):
"""Calculates overlap between Census tracts & various alternative geographies.""" """Calculates overlap between Census tracts & various alternative geographies."""
# Metadata for the baseclass
NAME = "geocorr_alternatives" NAME = "geocorr_alternatives"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False ISLAND_AREAS_EXPECTED_IN_DATA = True
INPUT_GEOCORR_TRACT_FIELD = "tract"
INPUT_GEOCORR_COUNTY_FIELD = "county"
INPUT_GEOCORR_ZIP_FIELD = "zcta5"
INPUT_GEOCORR_ALLOCATION_FIELD = "afact"
# Skip some validation checks, because there will be multiple rows per tract in this # Skip some validation checks, because there will be multiple rows per tract in this
# geocorr dataset. # geocorr dataset.
VALIDATE_SHOULD_SKIP_DUPLICATE_GEOGRAPHIES_AND_GEOGRAPHY_COUNT = True VALIDATE_SHOULD_SKIP_DUPLICATE_GEOGRAPHIES_AND_GEOGRAPHY_COUNT = True
# GeoCorr downloads have a field definition in the second row of the CSV. CRS_INTEGER = 3857
# This parameter skips the second row for pandas `read_csv`.
GEOCORR_SKIP_ROWS: typing.List[int] = [1]
# GeoCorr 2010 Zip Codes (Zip Code Tabulation Area 5, ZCTA5) -> 2010 Tracts ZCTA_2020_SHAPEFILE_PATH = (
# This file was generated in the UI at https://mcdc.missouri.edu/. "https://www2.census.gov/geo/tiger/GENZ2020/shp"
# The network request to generate it is: + "/cb_2020_us_zcta520_500k.zip"
# https://mcdc.missouri.edu/cgi-bin/broker?_PROGRAM=apps.geocorr2018.sas&_SERVICE=MCDC_long&_debug=0&state=Mo29&state=Al01&state=Ak02&state=Az04&state=Ar05&state=Ca06&state=Co08&state=Ct09&state=De10&state=Dc11&state=Fl12&state=Ga13&state=Hi15&state=Id16&state=Il17&state=In18&state=Ia19&state=Ks20&state=Ky21&state=La22&state=Me23&state=Md24&state=Ma25&state=Mi26&state=Mn27&state=Ms28&state=Mt30&state=Ne31&state=Nv32&state=Nh33&state=Nj34&state=Nm35&state=Ny36&state=Nc37&state=Nd38&state=Oh39&state=Ok40&state=Or41&state=Pa42&state=Ri44&state=Sc45&state=Sd46&state=Tn47&state=Tx48&state=Ut49&state=Vt50&state=Va51&state=Wa53&state=Wv54&state=Wi55&state=Wy56&g1_=zcta5&g2_=tract&wtvar=pop10&nozerob=1&title=&csvout=1&namoptf=b&listout=1&lstfmt=html&namoptr=b&oropt=&counties=&metros=&places=&latitude=&longitude=&locname=&distance=&kiloms=0&nrings=&r1=&r2=&r3=&r4=&r5=&r6=&r7=&r8=&r9=&r10=&lathi=&latlo=&longhi=&longlo=
ZIP_CODES_TO_TRACTS_PATH = (
settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/geocorr2018_zcta5_to_tracts.csv.zip"
) )
COLUMNS_TO_KEEP = [ ZIP_CODE_INPUT_FIELD = "ZCTA5CE20"
ExtractTransformLoad.GEOID_TRACT_FIELD_NAME, AREA_JOINED_FIELD = "area_joined"
field_names.ZIP_CODE, AREA_ZIP_FIELD = "area_zip"
field_names.PERCENT_OF_ZIP_CODE_IN_TRACT,
]
# Define these for easy code completion
def __init__(self): def __init__(self):
self.ZIP_CODES_TO_TRACTS_CSV = ( self.COLUMNS_TO_KEEP = [
self.get_tmp_path() / "geocorr2018_zcta5_to_tracts.csv" self.GEOID_TRACT_FIELD_NAME,
) field_names.ZIP_CODE,
field_names.PERCENT_OF_ZIP_CODE_IN_TRACT,
self.AREA_JOINED_FIELD,
self.AREA_ZIP_FIELD,
]
self.output_df: pd.DataFrame self.output_df: pd.DataFrame
self.census_tract_gdf: gpd.GeoDataFrame
def extract(self) -> None: def extract(self) -> None:
# Load the tracts to zip codes data. # Download 2020 zip boundaries.
unzip_file_from_url( unzip_file_from_url(
file_url=self.ZIP_CODES_TO_TRACTS_PATH, file_url=self.ZCTA_2020_SHAPEFILE_PATH,
download_path=self.get_tmp_path(), download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path(), unzipped_file_path=self.get_tmp_path() / "cb_2020_us_zcta520_500k",
verify=True, verify=True,
) )
# Load census
self.census_tract_gdf = get_tract_geojson()
def transform(self) -> None: def transform(self) -> None:
logger.info("Starting GeoCorr alternatives transforms.") logger.info("Starting GeoCorr alternatives transforms.")
zip_codes_to_tracts_df = pd.read_csv(
filepath_or_buffer=self.ZIP_CODES_TO_TRACTS_CSV,
dtype={
self.INPUT_GEOCORR_TRACT_FIELD: "string",
self.INPUT_GEOCORR_COUNTY_FIELD: "string",
self.INPUT_GEOCORR_ZIP_FIELD: "string",
},
skiprows=self.GEOCORR_SKIP_ROWS,
)
zip_codes_to_tracts_df = zip_codes_to_tracts_df.rename( # Read in ZCTA data.
columns={ zcta_2020_gdf = gpd.read_file(
self.INPUT_GEOCORR_ALLOCATION_FIELD: field_names.PERCENT_OF_ZIP_CODE_IN_TRACT, filename=self.get_tmp_path() / "cb_2020_us_zcta520_500k"
self.INPUT_GEOCORR_ZIP_FIELD: field_names.ZIP_CODE, )
}, zcta_2020_gdf = zcta_2020_gdf.rename(
columns={self.ZIP_CODE_INPUT_FIELD: field_names.ZIP_CODE},
errors="raise", errors="raise",
) )
# Create the tract ID by combining fields from GeoCorr. # Switch from geographic to projected CRSes
zip_codes_to_tracts_df[self.GEOID_TRACT_FIELD_NAME] = ( # because logically that's right
zip_codes_to_tracts_df[self.INPUT_GEOCORR_COUNTY_FIELD] self.census_tract_gdf = self.census_tract_gdf.to_crs(
+ zip_codes_to_tracts_df[self.INPUT_GEOCORR_TRACT_FIELD] crs=self.CRS_INTEGER
) )
# Remove unnecessary periods. zcta_2020_gdf = zcta_2020_gdf.to_crs(crs=self.CRS_INTEGER)
zip_codes_to_tracts_df[
self.GEOID_TRACT_FIELD_NAME # Calculate percentage overlap.
] = zip_codes_to_tracts_df[self.GEOID_TRACT_FIELD_NAME].str.replace( # Create a measure of area for the entire Zip area.
".", "", regex=False zcta_2020_gdf[self.AREA_ZIP_FIELD] = zcta_2020_gdf.area
# Perform overlay function.
# We have a mix of polygons and multipolygons, and we just want the overlaps
# without caring a ton about the specific types, so we ignore geom type.
# Realistically, this changes almost nothing in the calculation; True and False
# are the same within 9 digits of precision.
joined_gdf = gpd.overlay(
df1=zcta_2020_gdf,
df2=self.census_tract_gdf,
how="intersection",
keep_geom_type=False,
) )
self.output_df = zip_codes_to_tracts_df # Calculating the areas of the newly-created overlapping geometries
joined_gdf[self.AREA_JOINED_FIELD] = joined_gdf.area
# Calculating the areas of the newly-created geometries in relation
# to the original tract geometries
joined_gdf[field_names.PERCENT_OF_ZIP_CODE_IN_TRACT] = (
joined_gdf[self.AREA_JOINED_FIELD] / joined_gdf[self.AREA_ZIP_FIELD]
)
self.output_df = joined_gdf