diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py b/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py index e5cd9949..b6e0529d 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py @@ -1,6 +1,6 @@ import concurrent.futures import math - +import numpy as np import pandas as pd import geopandas as gpd @@ -52,7 +52,11 @@ class GeoScoreETL(ExtractTransformLoad): ] self.GEOMETRY_FIELD_NAME = "geometry" + # We will adjust this upwards while there is some fractional value + # in the score. This is a starting value. self.NUMBER_OF_BUCKETS = 10 + self.HOMOGENEITY_THRESHOLD = 200 + self.HIGH_LOW_ZOOM_CENSUS_TRACT_THRESHOLD = 150 self.geojson_usa_df: gpd.GeoDataFrame self.score_usa_df: pd.DataFrame @@ -96,12 +100,16 @@ class GeoScoreETL(ExtractTransformLoad): logger.info("Pruning Census GeoJSON") fields = [self.GEOID_FIELD_NAME, self.GEOMETRY_FIELD_NAME] - self.geojson_usa_df = self.geojson_usa_df[fields] # TODO update this join logger.info("Merging and compressing score CSV with USA GeoJSON") - self.geojson_score_usa_high = self.score_usa_df.merge( - self.geojson_usa_df, on=self.GEOID_FIELD_NAME, how="left" + self.geojson_score_usa_high = self.score_usa_df.set_index( + self.GEOID_FIELD_NAME + ).merge( + self.geojson_usa_df[fields].set_index(self.GEOID_FIELD_NAME), + left_index=True, + right_index=True, + how="left", ) self.geojson_score_usa_high = gpd.GeoDataFrame( @@ -110,44 +118,41 @@ class GeoScoreETL(ExtractTransformLoad): usa_simplified = self.geojson_score_usa_high[ [ - self.GEOID_FIELD_NAME, self.TARGET_SCORE_SHORT_FIELD, self.GEOMETRY_FIELD_NAME, ] - ].reset_index(drop=True) + ].reset_index() - usa_simplified.rename( - columns={ - self.TARGET_SCORE_SHORT_FIELD: self.TARGET_SCORE_RENAME_TO - }, - inplace=True, + usa_tracts = usa_simplified.rename( + columns={self.TARGET_SCORE_SHORT_FIELD: self.TARGET_SCORE_RENAME_TO} ) - logger.info("Aggregating into tracts (~5 minutes)") - usa_tracts = self._aggregate_to_tracts(usa_simplified) - + logger.info("Converting to geojson into tracts") usa_tracts = gpd.GeoDataFrame( usa_tracts, - columns=[self.TARGET_SCORE_RENAME_TO, self.GEOMETRY_FIELD_NAME], + columns=[ + self.TARGET_SCORE_RENAME_TO, + self.GEOMETRY_FIELD_NAME, + self.GEOID_FIELD_NAME, + ], crs="EPSG:4326", ) logger.info("Creating buckets from tracts") - usa_bucketed = self._create_buckets_from_tracts( + usa_bucketed, keep_high_zoom_df = self._create_buckets_from_tracts( usa_tracts, self.NUMBER_OF_BUCKETS ) logger.info("Aggregating buckets") usa_aggregated = self._aggregate_buckets(usa_bucketed, agg_func="mean") + logger.info("Breaking up polygons") compressed = self._breakup_multipolygons( usa_aggregated, self.NUMBER_OF_BUCKETS ) - self.geojson_score_usa_low = gpd.GeoDataFrame( - compressed, - columns=[self.TARGET_SCORE_RENAME_TO, self.GEOMETRY_FIELD_NAME], - crs="EPSG:4326", + self.geojson_score_usa_low = self._join_high_and_low_zoom_frames( + compressed, keep_high_zoom_df ) # round to 2 decimals @@ -155,40 +160,70 @@ class GeoScoreETL(ExtractTransformLoad): {self.TARGET_SCORE_RENAME_TO: 2} ) - def _aggregate_to_tracts( - self, block_group_df: gpd.GeoDataFrame - ) -> gpd.GeoDataFrame: - # The tract identifier is the first 11 digits of the GEOID - block_group_df["tract"] = block_group_df.apply( - lambda row: row[self.GEOID_FIELD_NAME][0:11], axis=1 - ) - state_tracts = block_group_df.dissolve(by="tract", aggfunc="mean") - return state_tracts - def _create_buckets_from_tracts( - self, state_tracts: gpd.GeoDataFrame, num_buckets: int - ) -> gpd.GeoDataFrame: - # assign tracts to buckets by D_SCORE - state_tracts.sort_values(self.TARGET_SCORE_RENAME_TO, inplace=True) - SCORE_bucket = [] + self, initial_state_tracts: gpd.GeoDataFrame, num_buckets: int + ): + # First, we remove any states that have under the threshold of census tracts + # from being aggregated (right now, this just removes Wyoming) + highzoom_state_tracts = initial_state_tracts.reset_index() + highzoom_state_tracts["state"] = highzoom_state_tracts[ + self.GEOID_FIELD_NAME + ].str[:2] + keep_high_zoom = highzoom_state_tracts.groupby("state")[ + self.GEOID_FIELD_NAME + ].transform( + lambda x: x.count() <= self.HIGH_LOW_ZOOM_CENSUS_TRACT_THRESHOLD + ) + assert ( + keep_high_zoom.sum() != initial_state_tracts.shape[0] + ), "Error: Cutoff is too high, nothing is aggregated" + assert keep_high_zoom.sum() > 1, "Error: Nothing is kept at high zoom" + + # Then we assign buckets only to tracts that do not get "kept" at high zoom + state_tracts = initial_state_tracts[~keep_high_zoom].copy() + state_tracts[f"{self.TARGET_SCORE_RENAME_TO}_bucket"] = np.arange( + len(state_tracts) + ) + # assign tracts to buckets by score + state_tracts = state_tracts.sort_values( + self.TARGET_SCORE_RENAME_TO, ascending=True + ) + score_bucket = [] bucket_size = math.ceil( len(state_tracts.index) / self.NUMBER_OF_BUCKETS ) - for i in range(len(state_tracts.index)): - SCORE_bucket.extend([math.floor(i / bucket_size)]) - state_tracts[f"{self.TARGET_SCORE_RENAME_TO}_bucket"] = SCORE_bucket - return state_tracts - def _aggregate_buckets(self, state_tracts: gpd.GeoDataFrame, agg_func: str): - # dissolve tracts by bucket - state_attr = state_tracts[ - [ - self.TARGET_SCORE_RENAME_TO, - f"{self.TARGET_SCORE_RENAME_TO}_bucket", - self.GEOMETRY_FIELD_NAME, - ] - ].reset_index(drop=True) - state_dissolve = state_attr.dissolve( + # This just increases the number of buckets so they are more + # homogeneous. It's not actually necessary :shrug: + while ( + state_tracts[self.TARGET_SCORE_RENAME_TO].sum() % bucket_size + > self.HOMOGENEITY_THRESHOLD + ): + self.NUMBER_OF_BUCKETS += 1 + bucket_size = math.ceil( + len(state_tracts.index) / self.NUMBER_OF_BUCKETS + ) + + logger.info( + f"The number of buckets has increased to {self.NUMBER_OF_BUCKETS}" + ) + for i in range(len(state_tracts.index)): + score_bucket.extend([math.floor(i / bucket_size)]) + state_tracts[f"{self.TARGET_SCORE_RENAME_TO}_bucket"] = score_bucket + + return state_tracts, initial_state_tracts[keep_high_zoom] + + def _aggregate_buckets( + self, state_tracts: gpd.GeoDataFrame, agg_func: str + ) -> gpd.GeoDataFrame: + keep_cols = [ + self.TARGET_SCORE_RENAME_TO, + f"{self.TARGET_SCORE_RENAME_TO}_bucket", + self.GEOMETRY_FIELD_NAME, + ] + + # We dissolve all other tracts by their score bucket + state_dissolve = state_tracts[keep_cols].dissolve( by=f"{self.TARGET_SCORE_RENAME_TO}_bucket", aggfunc=agg_func ) return state_dissolve @@ -196,6 +231,7 @@ class GeoScoreETL(ExtractTransformLoad): def _breakup_multipolygons( self, state_bucketed_df: gpd.GeoDataFrame, num_buckets: int ) -> gpd.GeoDataFrame: + compressed = [] for i in range(num_buckets): for j in range( @@ -209,6 +245,20 @@ class GeoScoreETL(ExtractTransformLoad): ) return compressed + def _join_high_and_low_zoom_frames( + self, compressed: list, keep_high_zoom_df: gpd.GeoDataFrame + ) -> gpd.GeoDataFrame: + keep_columns = [ + self.TARGET_SCORE_RENAME_TO, + self.GEOMETRY_FIELD_NAME, + ] + compressed_geodf = gpd.GeoDataFrame( + compressed, + columns=keep_columns, + crs="EPSG:4326", + ) + return pd.concat([compressed_geodf, keep_high_zoom_df[keep_columns]]) + def load(self) -> None: # Create separate threads to run each write to disk. def write_high_to_file(): @@ -243,6 +293,7 @@ class GeoScoreETL(ExtractTransformLoad): codebook[new_col] = reversed_tiles.get(column, column) if new_col != column: renaming_map[column] = new_col + pd.Series(codebook).reset_index().rename( # kept as strings because no downstream impacts columns={0: "column", "index": "meaning"} diff --git a/data/data-pipeline/poetry.lock b/data/data-pipeline/poetry.lock index 73ac2fd9..028d9f4c 100644 --- a/data/data-pipeline/poetry.lock +++ b/data/data-pipeline/poetry.lock @@ -1338,11 +1338,11 @@ python-versions = "*" [[package]] name = "pywinpty" -version = "2.0.2" +version = "2.0.4" description = "Pseudo terminal support for Windows from Python." category = "main" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" [[package]] name = "pyyaml" @@ -1496,7 +1496,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" [[package]] name = "terminado" -version = "0.13.1" +version = "0.13.2" description = "Tornado websocket backend for the Xterm.js Javascript terminal emulator library." category = "main" optional = false @@ -1737,7 +1737,8 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest- [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "2f4eed83e5b153dafda74e7fd54b718ede6f4e960fb523c2d733643be76762a4" +content-hash = "f057f4718a9236c76aa48f9d67d3d3d76d9e9099e148bffac0abe92714ac0591" + [metadata.files] appnope = [ @@ -2595,11 +2596,11 @@ pywin32 = [ {file = "pywin32-303-cp39-cp39-win_amd64.whl", hash = "sha256:79cbb862c11b9af19bcb682891c1b91942ec2ff7de8151e2aea2e175899cda34"}, ] pywinpty = [ - {file = "pywinpty-2.0.2-cp310-none-win_amd64.whl", hash = "sha256:4b421379b407bf2f52a64a4c58f61deffe623b5add02d871acb290b771bb6227"}, - {file = "pywinpty-2.0.2-cp37-none-win_amd64.whl", hash = "sha256:238b75fc456a6bc558761a89c9e6b3c8f2f54d79db03ae28997a68313c24b2ca"}, - {file = "pywinpty-2.0.2-cp38-none-win_amd64.whl", hash = "sha256:344858a0b956fdc64a547d5e1980b0257b47f5433ed7cb89bf7b6268cb280c6c"}, - {file = "pywinpty-2.0.2-cp39-none-win_amd64.whl", hash = "sha256:a4a066eaf2e30944d3028d946883ceb7883a499b53c4b89ca2d54bd7a4210550"}, - {file = "pywinpty-2.0.2.tar.gz", hash = "sha256:20ec117183f79642eff555ce0dd1823f942618d65813fb6122d14b6e34b5d05a"}, + {file = "pywinpty-2.0.4-cp310-none-win_amd64.whl", hash = "sha256:b13b6736e41437492f2e322294b7ed5b1ac71586f72de998515be2e9b3077b21"}, + {file = "pywinpty-2.0.4-cp37-none-win_amd64.whl", hash = "sha256:a024231bd077c17cc2d27b63a60d0717745f37e5ef6c1f688cc20a438e761e59"}, + {file = "pywinpty-2.0.4-cp38-none-win_amd64.whl", hash = "sha256:50209e09fbc96dc6a7d9d5300c87528e2987c55f0761bab0a233e304319429cc"}, + {file = "pywinpty-2.0.4-cp39-none-win_amd64.whl", hash = "sha256:1eab7442f4a42dfd2a8f5e0b195e018884a58331e8c0289f041b03d3fd875d7f"}, + {file = "pywinpty-2.0.4.tar.gz", hash = "sha256:a5002c74afc1ddcc85fca25de58ce9cef1a22957582981418c81aaee218675ad"}, ] pyyaml = [ {file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"}, @@ -2746,8 +2747,8 @@ six = [ {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, ] terminado = [ - {file = "terminado-0.13.1-py3-none-any.whl", hash = "sha256:f446b522b50a7aa68b5def0a02893978fb48cb82298b0ebdae13003c6ee6f198"}, - {file = "terminado-0.13.1.tar.gz", hash = "sha256:5b82b5c6e991f0705a76f961f43262a7fb1e55b093c16dca83f16384a7f39b7b"}, + {file = "terminado-0.13.2-py3-none-any.whl", hash = "sha256:d61f112f3beb7271d953d3934f056af185f6be0750303581fa1c511379a8a5d0"}, + {file = "terminado-0.13.2.tar.gz", hash = "sha256:e6147a7ea31d150f9df4a26cedde3dbb2e011be269f89ff0267ae4157f3ae426"}, ] testpath = [ {file = "testpath-0.6.0-py3-none-any.whl", hash = "sha256:8ada9f80a2ac6fb0391aa7cdb1a7d11cfa8429f693eda83f74dde570fe6fa639"},