mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-07-28 08:01:17 -07:00
WY tracts are not showing up until zoom >7 (#1342)
In order to solve an issue where states with few census tracts appear to have no DACs, we change the low-zoom for states with under some threshold of tracts to be the high-zoom for those states. Thus, WY now has DACs even in low zoom. Yay!
This commit is contained in:
parent
88d50748eb
commit
917b84dc2e
2 changed files with 112 additions and 60 deletions
|
@ -1,6 +1,6 @@
|
|||
import concurrent.futures
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import geopandas as gpd
|
||||
|
||||
|
@ -52,7 +52,11 @@ class GeoScoreETL(ExtractTransformLoad):
|
|||
]
|
||||
self.GEOMETRY_FIELD_NAME = "geometry"
|
||||
|
||||
# We will adjust this upwards while there is some fractional value
|
||||
# in the score. This is a starting value.
|
||||
self.NUMBER_OF_BUCKETS = 10
|
||||
self.HOMOGENEITY_THRESHOLD = 200
|
||||
self.HIGH_LOW_ZOOM_CENSUS_TRACT_THRESHOLD = 150
|
||||
|
||||
self.geojson_usa_df: gpd.GeoDataFrame
|
||||
self.score_usa_df: pd.DataFrame
|
||||
|
@ -96,12 +100,16 @@ class GeoScoreETL(ExtractTransformLoad):
|
|||
|
||||
logger.info("Pruning Census GeoJSON")
|
||||
fields = [self.GEOID_FIELD_NAME, self.GEOMETRY_FIELD_NAME]
|
||||
self.geojson_usa_df = self.geojson_usa_df[fields]
|
||||
|
||||
# TODO update this join
|
||||
logger.info("Merging and compressing score CSV with USA GeoJSON")
|
||||
self.geojson_score_usa_high = self.score_usa_df.merge(
|
||||
self.geojson_usa_df, on=self.GEOID_FIELD_NAME, how="left"
|
||||
self.geojson_score_usa_high = self.score_usa_df.set_index(
|
||||
self.GEOID_FIELD_NAME
|
||||
).merge(
|
||||
self.geojson_usa_df[fields].set_index(self.GEOID_FIELD_NAME),
|
||||
left_index=True,
|
||||
right_index=True,
|
||||
how="left",
|
||||
)
|
||||
|
||||
self.geojson_score_usa_high = gpd.GeoDataFrame(
|
||||
|
@ -110,44 +118,41 @@ class GeoScoreETL(ExtractTransformLoad):
|
|||
|
||||
usa_simplified = self.geojson_score_usa_high[
|
||||
[
|
||||
self.GEOID_FIELD_NAME,
|
||||
self.TARGET_SCORE_SHORT_FIELD,
|
||||
self.GEOMETRY_FIELD_NAME,
|
||||
]
|
||||
].reset_index(drop=True)
|
||||
].reset_index()
|
||||
|
||||
usa_simplified.rename(
|
||||
columns={
|
||||
self.TARGET_SCORE_SHORT_FIELD: self.TARGET_SCORE_RENAME_TO
|
||||
},
|
||||
inplace=True,
|
||||
usa_tracts = usa_simplified.rename(
|
||||
columns={self.TARGET_SCORE_SHORT_FIELD: self.TARGET_SCORE_RENAME_TO}
|
||||
)
|
||||
|
||||
logger.info("Aggregating into tracts (~5 minutes)")
|
||||
usa_tracts = self._aggregate_to_tracts(usa_simplified)
|
||||
|
||||
logger.info("Converting to geojson into tracts")
|
||||
usa_tracts = gpd.GeoDataFrame(
|
||||
usa_tracts,
|
||||
columns=[self.TARGET_SCORE_RENAME_TO, self.GEOMETRY_FIELD_NAME],
|
||||
columns=[
|
||||
self.TARGET_SCORE_RENAME_TO,
|
||||
self.GEOMETRY_FIELD_NAME,
|
||||
self.GEOID_FIELD_NAME,
|
||||
],
|
||||
crs="EPSG:4326",
|
||||
)
|
||||
|
||||
logger.info("Creating buckets from tracts")
|
||||
usa_bucketed = self._create_buckets_from_tracts(
|
||||
usa_bucketed, keep_high_zoom_df = self._create_buckets_from_tracts(
|
||||
usa_tracts, self.NUMBER_OF_BUCKETS
|
||||
)
|
||||
|
||||
logger.info("Aggregating buckets")
|
||||
usa_aggregated = self._aggregate_buckets(usa_bucketed, agg_func="mean")
|
||||
|
||||
logger.info("Breaking up polygons")
|
||||
compressed = self._breakup_multipolygons(
|
||||
usa_aggregated, self.NUMBER_OF_BUCKETS
|
||||
)
|
||||
|
||||
self.geojson_score_usa_low = gpd.GeoDataFrame(
|
||||
compressed,
|
||||
columns=[self.TARGET_SCORE_RENAME_TO, self.GEOMETRY_FIELD_NAME],
|
||||
crs="EPSG:4326",
|
||||
self.geojson_score_usa_low = self._join_high_and_low_zoom_frames(
|
||||
compressed, keep_high_zoom_df
|
||||
)
|
||||
|
||||
# round to 2 decimals
|
||||
|
@ -155,40 +160,70 @@ class GeoScoreETL(ExtractTransformLoad):
|
|||
{self.TARGET_SCORE_RENAME_TO: 2}
|
||||
)
|
||||
|
||||
def _aggregate_to_tracts(
|
||||
self, block_group_df: gpd.GeoDataFrame
|
||||
) -> gpd.GeoDataFrame:
|
||||
# The tract identifier is the first 11 digits of the GEOID
|
||||
block_group_df["tract"] = block_group_df.apply(
|
||||
lambda row: row[self.GEOID_FIELD_NAME][0:11], axis=1
|
||||
)
|
||||
state_tracts = block_group_df.dissolve(by="tract", aggfunc="mean")
|
||||
return state_tracts
|
||||
|
||||
def _create_buckets_from_tracts(
|
||||
self, state_tracts: gpd.GeoDataFrame, num_buckets: int
|
||||
) -> gpd.GeoDataFrame:
|
||||
# assign tracts to buckets by D_SCORE
|
||||
state_tracts.sort_values(self.TARGET_SCORE_RENAME_TO, inplace=True)
|
||||
SCORE_bucket = []
|
||||
self, initial_state_tracts: gpd.GeoDataFrame, num_buckets: int
|
||||
):
|
||||
# First, we remove any states that have under the threshold of census tracts
|
||||
# from being aggregated (right now, this just removes Wyoming)
|
||||
highzoom_state_tracts = initial_state_tracts.reset_index()
|
||||
highzoom_state_tracts["state"] = highzoom_state_tracts[
|
||||
self.GEOID_FIELD_NAME
|
||||
].str[:2]
|
||||
keep_high_zoom = highzoom_state_tracts.groupby("state")[
|
||||
self.GEOID_FIELD_NAME
|
||||
].transform(
|
||||
lambda x: x.count() <= self.HIGH_LOW_ZOOM_CENSUS_TRACT_THRESHOLD
|
||||
)
|
||||
assert (
|
||||
keep_high_zoom.sum() != initial_state_tracts.shape[0]
|
||||
), "Error: Cutoff is too high, nothing is aggregated"
|
||||
assert keep_high_zoom.sum() > 1, "Error: Nothing is kept at high zoom"
|
||||
|
||||
# Then we assign buckets only to tracts that do not get "kept" at high zoom
|
||||
state_tracts = initial_state_tracts[~keep_high_zoom].copy()
|
||||
state_tracts[f"{self.TARGET_SCORE_RENAME_TO}_bucket"] = np.arange(
|
||||
len(state_tracts)
|
||||
)
|
||||
# assign tracts to buckets by score
|
||||
state_tracts = state_tracts.sort_values(
|
||||
self.TARGET_SCORE_RENAME_TO, ascending=True
|
||||
)
|
||||
score_bucket = []
|
||||
bucket_size = math.ceil(
|
||||
len(state_tracts.index) / self.NUMBER_OF_BUCKETS
|
||||
)
|
||||
for i in range(len(state_tracts.index)):
|
||||
SCORE_bucket.extend([math.floor(i / bucket_size)])
|
||||
state_tracts[f"{self.TARGET_SCORE_RENAME_TO}_bucket"] = SCORE_bucket
|
||||
return state_tracts
|
||||
|
||||
def _aggregate_buckets(self, state_tracts: gpd.GeoDataFrame, agg_func: str):
|
||||
# dissolve tracts by bucket
|
||||
state_attr = state_tracts[
|
||||
[
|
||||
self.TARGET_SCORE_RENAME_TO,
|
||||
f"{self.TARGET_SCORE_RENAME_TO}_bucket",
|
||||
self.GEOMETRY_FIELD_NAME,
|
||||
]
|
||||
].reset_index(drop=True)
|
||||
state_dissolve = state_attr.dissolve(
|
||||
# This just increases the number of buckets so they are more
|
||||
# homogeneous. It's not actually necessary :shrug:
|
||||
while (
|
||||
state_tracts[self.TARGET_SCORE_RENAME_TO].sum() % bucket_size
|
||||
> self.HOMOGENEITY_THRESHOLD
|
||||
):
|
||||
self.NUMBER_OF_BUCKETS += 1
|
||||
bucket_size = math.ceil(
|
||||
len(state_tracts.index) / self.NUMBER_OF_BUCKETS
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"The number of buckets has increased to {self.NUMBER_OF_BUCKETS}"
|
||||
)
|
||||
for i in range(len(state_tracts.index)):
|
||||
score_bucket.extend([math.floor(i / bucket_size)])
|
||||
state_tracts[f"{self.TARGET_SCORE_RENAME_TO}_bucket"] = score_bucket
|
||||
|
||||
return state_tracts, initial_state_tracts[keep_high_zoom]
|
||||
|
||||
def _aggregate_buckets(
|
||||
self, state_tracts: gpd.GeoDataFrame, agg_func: str
|
||||
) -> gpd.GeoDataFrame:
|
||||
keep_cols = [
|
||||
self.TARGET_SCORE_RENAME_TO,
|
||||
f"{self.TARGET_SCORE_RENAME_TO}_bucket",
|
||||
self.GEOMETRY_FIELD_NAME,
|
||||
]
|
||||
|
||||
# We dissolve all other tracts by their score bucket
|
||||
state_dissolve = state_tracts[keep_cols].dissolve(
|
||||
by=f"{self.TARGET_SCORE_RENAME_TO}_bucket", aggfunc=agg_func
|
||||
)
|
||||
return state_dissolve
|
||||
|
@ -196,6 +231,7 @@ class GeoScoreETL(ExtractTransformLoad):
|
|||
def _breakup_multipolygons(
|
||||
self, state_bucketed_df: gpd.GeoDataFrame, num_buckets: int
|
||||
) -> gpd.GeoDataFrame:
|
||||
|
||||
compressed = []
|
||||
for i in range(num_buckets):
|
||||
for j in range(
|
||||
|
@ -209,6 +245,20 @@ class GeoScoreETL(ExtractTransformLoad):
|
|||
)
|
||||
return compressed
|
||||
|
||||
def _join_high_and_low_zoom_frames(
|
||||
self, compressed: list, keep_high_zoom_df: gpd.GeoDataFrame
|
||||
) -> gpd.GeoDataFrame:
|
||||
keep_columns = [
|
||||
self.TARGET_SCORE_RENAME_TO,
|
||||
self.GEOMETRY_FIELD_NAME,
|
||||
]
|
||||
compressed_geodf = gpd.GeoDataFrame(
|
||||
compressed,
|
||||
columns=keep_columns,
|
||||
crs="EPSG:4326",
|
||||
)
|
||||
return pd.concat([compressed_geodf, keep_high_zoom_df[keep_columns]])
|
||||
|
||||
def load(self) -> None:
|
||||
# Create separate threads to run each write to disk.
|
||||
def write_high_to_file():
|
||||
|
@ -243,6 +293,7 @@ class GeoScoreETL(ExtractTransformLoad):
|
|||
codebook[new_col] = reversed_tiles.get(column, column)
|
||||
if new_col != column:
|
||||
renaming_map[column] = new_col
|
||||
|
||||
pd.Series(codebook).reset_index().rename(
|
||||
# kept as strings because no downstream impacts
|
||||
columns={0: "column", "index": "meaning"}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue