mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-07-31 09:11:19 -07:00
Generate Geo-aware scores for all zoom levels (#391)
* generate Geo-aware scores for all zoom levels * usa high progress * testing dissolve * checkpoint * changing type * removing breakpoint * validation notebooks * quick update * score validation * fixes for county merge * code completed
This commit is contained in:
parent
446c8d1f68
commit
b404fdcc43
14 changed files with 3023 additions and 270 deletions
|
@ -2,6 +2,7 @@ import importlib
|
|||
|
||||
from etl.score.etl_score import ScoreETL
|
||||
from etl.score.etl_score_post import PostScoreETL
|
||||
from etl.score.etl_score_geo import GeoScoreETL
|
||||
|
||||
|
||||
def etl_runner(dataset_to_run: str = None) -> None:
|
||||
|
@ -112,6 +113,23 @@ def score_generate() -> None:
|
|||
score_post.cleanup()
|
||||
|
||||
|
||||
def score_geo() -> None:
|
||||
"""Generates the geojson files with score data baked in
|
||||
|
||||
Args:
|
||||
None
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
|
||||
# Score Geo
|
||||
score_geo = GeoScoreETL()
|
||||
score_geo.extract()
|
||||
score_geo.transform()
|
||||
score_geo.load()
|
||||
|
||||
|
||||
def _find_dataset_index(dataset_list, key, value):
|
||||
for i, element in enumerate(dataset_list):
|
||||
if element[key] == value:
|
||||
|
|
168
data/data-pipeline/etl/score/etl_score_geo.py
Normal file
168
data/data-pipeline/etl/score/etl_score_geo.py
Normal file
|
@ -0,0 +1,168 @@
|
|||
import pandas as pd
|
||||
import geopandas as gpd
|
||||
import math
|
||||
|
||||
from etl.base import ExtractTransformLoad
|
||||
from utils import get_module_logger
|
||||
|
||||
logger = get_module_logger(__name__)
|
||||
|
||||
|
||||
class GeoScoreETL(ExtractTransformLoad):
|
||||
"""
|
||||
A class used to generate per state and national GeoJson files with the score baked in
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.SCORE_GEOJSON_PATH = self.DATA_PATH / "score" / "geojson"
|
||||
self.SCORE_LOW_GEOJSON = self.SCORE_GEOJSON_PATH / "usa-low.json"
|
||||
self.SCORE_HIGH_GEOJSON = self.SCORE_GEOJSON_PATH / "usa-high.json"
|
||||
|
||||
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
|
||||
self.TILE_SCORE_CSV = self.SCORE_CSV_PATH / "tiles" / "usa.csv"
|
||||
|
||||
self.CENSUS_USA_GEOJSON = (
|
||||
self.DATA_PATH / "census" / "geojson" / "us.json"
|
||||
)
|
||||
|
||||
self.TARGET_SCORE_NAME = "Score E (percentile)"
|
||||
self.TARGET_SCORE_RENAME_TO = "E_SCORE"
|
||||
|
||||
self.NUMBER_OF_BUCKETS = 10
|
||||
|
||||
self.geojson_usa_df: gpd.GeoDataFrame
|
||||
self.score_usa_df: pd.DataFrame
|
||||
self.geojson_score_usa_high: gpd.GeoDataFrame
|
||||
self.geojson_score_usa_low: gpd.GeoDataFrame
|
||||
|
||||
def extract(self) -> None:
|
||||
logger.info(f"Reading US GeoJSON (~6 minutes)")
|
||||
self.geojson_usa_df = gpd.read_file(
|
||||
self.CENSUS_USA_GEOJSON,
|
||||
dtype={"GEOID10": "string"},
|
||||
usecols=["GEOID10", "geometry"],
|
||||
low_memory=False,
|
||||
)
|
||||
self.geojson_usa_df.head()
|
||||
|
||||
logger.info(f"Reading score CSV")
|
||||
self.score_usa_df = pd.read_csv(
|
||||
self.TILE_SCORE_CSV,
|
||||
dtype={"GEOID10": "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
|
||||
def transform(self) -> None:
|
||||
logger.info(f"Pruning Census GeoJSON")
|
||||
fields = ["GEOID10", "geometry"]
|
||||
self.geojson_usa_df = self.geojson_usa_df[fields]
|
||||
|
||||
logger.info(f"Merging and compressing score CSV with USA GeoJSON")
|
||||
self.geojson_score_usa_high = self.score_usa_df.merge(
|
||||
self.geojson_usa_df, on="GEOID10", how="left"
|
||||
)
|
||||
|
||||
self.geojson_score_usa_high = gpd.GeoDataFrame(
|
||||
self.geojson_score_usa_high, crs="EPSG:4326"
|
||||
)
|
||||
|
||||
usa_simplified = self.geojson_score_usa_high[
|
||||
["GEOID10", self.TARGET_SCORE_NAME, "geometry"]
|
||||
].reset_index(drop=True)
|
||||
|
||||
usa_simplified.rename(
|
||||
columns={self.TARGET_SCORE_NAME: self.TARGET_SCORE_RENAME_TO},
|
||||
inplace=True,
|
||||
)
|
||||
|
||||
logger.info(f"Aggregating into tracts (~5 minutes)")
|
||||
usa_tracts = self._aggregate_to_tracts(usa_simplified)
|
||||
|
||||
usa_tracts = gpd.GeoDataFrame(
|
||||
usa_tracts,
|
||||
columns=[self.TARGET_SCORE_RENAME_TO, "geometry"],
|
||||
crs="EPSG:4326",
|
||||
)
|
||||
|
||||
logger.info(f"Creating buckets from tracts")
|
||||
usa_bucketed = self._create_buckets_from_tracts(
|
||||
usa_tracts, self.NUMBER_OF_BUCKETS
|
||||
)
|
||||
|
||||
logger.info(f"Aggregating buckets")
|
||||
usa_aggregated = self._aggregate_buckets(usa_bucketed, agg_func="mean")
|
||||
|
||||
compressed = self._breakup_multipolygons(
|
||||
usa_aggregated, self.NUMBER_OF_BUCKETS
|
||||
)
|
||||
|
||||
self.geojson_score_usa_low = gpd.GeoDataFrame(
|
||||
compressed,
|
||||
columns=[self.TARGET_SCORE_RENAME_TO, "geometry"],
|
||||
crs="EPSG:4326",
|
||||
)
|
||||
|
||||
def _aggregate_to_tracts(
|
||||
self, block_group_df: gpd.GeoDataFrame
|
||||
) -> gpd.GeoDataFrame:
|
||||
# The tract identifier is the first 11 digits of the GEOID
|
||||
block_group_df["tract"] = block_group_df.apply(
|
||||
lambda row: row["GEOID10"][0:11], axis=1
|
||||
)
|
||||
state_tracts = block_group_df.dissolve(by="tract", aggfunc="mean")
|
||||
return state_tracts
|
||||
|
||||
def _create_buckets_from_tracts(
|
||||
self, state_tracts: gpd.GeoDataFrame, num_buckets: int
|
||||
) -> gpd.GeoDataFrame:
|
||||
# assign tracts to buckets by D_SCORE
|
||||
state_tracts.sort_values(self.TARGET_SCORE_RENAME_TO, inplace=True)
|
||||
SCORE_bucket = []
|
||||
bucket_size = math.ceil(
|
||||
len(state_tracts.index) / self.NUMBER_OF_BUCKETS
|
||||
)
|
||||
for i in range(len(state_tracts.index)):
|
||||
SCORE_bucket.extend([math.floor(i / bucket_size)])
|
||||
state_tracts[f"{self.TARGET_SCORE_RENAME_TO}_bucket"] = SCORE_bucket
|
||||
return state_tracts
|
||||
|
||||
def _aggregate_buckets(self, state_tracts: gpd.GeoDataFrame, agg_func: str):
|
||||
# dissolve tracts by bucket
|
||||
state_attr = state_tracts[
|
||||
[
|
||||
self.TARGET_SCORE_RENAME_TO,
|
||||
f"{self.TARGET_SCORE_RENAME_TO}_bucket",
|
||||
"geometry",
|
||||
]
|
||||
].reset_index(drop=True)
|
||||
state_dissolve = state_attr.dissolve(
|
||||
by=f"{self.TARGET_SCORE_RENAME_TO}_bucket", aggfunc=agg_func
|
||||
)
|
||||
return state_dissolve
|
||||
|
||||
def _breakup_multipolygons(
|
||||
self, state_bucketed_df: gpd.GeoDataFrame, num_buckets: int
|
||||
) -> gpd.GeoDataFrame:
|
||||
compressed = []
|
||||
for i in range(num_buckets):
|
||||
for j in range(len(state_bucketed_df["geometry"][i].geoms)):
|
||||
compressed.append(
|
||||
[
|
||||
state_bucketed_df[self.TARGET_SCORE_RENAME_TO][i],
|
||||
state_bucketed_df["geometry"][i].geoms[j],
|
||||
]
|
||||
)
|
||||
return compressed
|
||||
|
||||
def load(self) -> None:
|
||||
logger.info(f"Writing usa-high (~9 minutes)")
|
||||
self.geojson_score_usa_high.to_file(
|
||||
self.SCORE_HIGH_GEOJSON, driver="GeoJSON"
|
||||
)
|
||||
logger.info(f"Completed writing usa-high")
|
||||
|
||||
logger.info(f"Writing usa-low (~9 minutes)")
|
||||
self.geojson_score_usa_low.to_file(
|
||||
self.SCORE_LOW_GEOJSON, driver="GeoJSON"
|
||||
)
|
||||
logger.info(f"Completed writing usa-low")
|
|
@ -16,10 +16,13 @@ class PostScoreETL(ExtractTransformLoad):
|
|||
self.CENSUS_COUNTIES_ZIP_URL = "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/Gaz_counties_national.zip"
|
||||
self.CENSUS_COUNTIES_TXT = self.TMP_PATH / "Gaz_counties_national.txt"
|
||||
self.CENSUS_COUNTIES_COLS = ["USPS", "GEOID", "NAME"]
|
||||
self.CENSUS_USA_CSV = self.DATA_PATH / "census" / "csv" / "us.csv"
|
||||
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
|
||||
|
||||
self.STATE_CSV = (
|
||||
self.DATA_PATH / "census" / "csv" / "fips_states_2010.csv"
|
||||
)
|
||||
|
||||
self.FULL_SCORE_CSV = self.SCORE_CSV_PATH / "full" / "usa.csv"
|
||||
self.TILR_SCORE_CSV = self.SCORE_CSV_PATH / "tile" / "usa.csv"
|
||||
|
||||
|
@ -87,17 +90,43 @@ class PostScoreETL(ExtractTransformLoad):
|
|||
# add the tract level column
|
||||
self.score_df["GEOID"] = self.score_df.GEOID10.str[:5]
|
||||
|
||||
# merge state and counties
|
||||
county_state_merged = self.counties_df.join(
|
||||
self.states_df, rsuffix=" Other"
|
||||
# merge state with counties
|
||||
county_state_merged = self.counties_df.merge(
|
||||
self.states_df, on="State Abbreviation", how="left"
|
||||
)
|
||||
del county_state_merged["State Abbreviation Other"]
|
||||
|
||||
# merge county and score
|
||||
self.score_county_state_merged = self.score_df.join(
|
||||
county_state_merged, rsuffix="_OTHER"
|
||||
# merge state + county with score
|
||||
self.score_county_state_merged = self.score_df.merge(
|
||||
county_state_merged, on="GEOID", how="left"
|
||||
)
|
||||
del self.score_county_state_merged["GEOID_OTHER"]
|
||||
|
||||
# check if there are census cbgs without score
|
||||
logger.info(f"Removing CBG rows without score")
|
||||
|
||||
## load cbgs
|
||||
cbg_usa_df = pd.read_csv(
|
||||
self.CENSUS_USA_CSV,
|
||||
names=["GEOID10"],
|
||||
dtype={"GEOID10": "string"},
|
||||
low_memory=False,
|
||||
header=None,
|
||||
)
|
||||
|
||||
# merge census cbgs with score
|
||||
merged_df = cbg_usa_df.merge(
|
||||
self.score_county_state_merged, on="GEOID10", how="left"
|
||||
)
|
||||
|
||||
# list the null score cbgs
|
||||
null_cbg_df = merged_df[merged_df["Score E (percentile)"].isnull()]
|
||||
|
||||
# subsctract data sets
|
||||
removed_df = pd.concat(
|
||||
[merged_df, null_cbg_df, null_cbg_df]
|
||||
).drop_duplicates(keep=False)
|
||||
|
||||
# set the score to the new df
|
||||
self.score_county_state_merged = removed_df
|
||||
|
||||
def load(self) -> None:
|
||||
logger.info(f"Saving Full Score CSV with County Information")
|
||||
|
|
|
@ -9,12 +9,16 @@ logger = get_module_logger(__name__)
|
|||
class CalEnviroScreenETL(ExtractTransformLoad):
|
||||
def __init__(self):
|
||||
self.CALENVIROSCREEN_FTP_URL = "https://justice40-data.s3.amazonaws.com/data-sources/CalEnviroScreen_4.0_2021.zip"
|
||||
self.CALENVIROSCREEN_CSV = self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv"
|
||||
self.CALENVIROSCREEN_CSV = (
|
||||
self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv"
|
||||
)
|
||||
self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4"
|
||||
|
||||
# Definining some variable names
|
||||
self.CALENVIROSCREEN_SCORE_FIELD_NAME = "calenviroscreen_score"
|
||||
self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME = "calenviroscreen_percentile"
|
||||
self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME = (
|
||||
"calenviroscreen_percentile"
|
||||
)
|
||||
self.CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD_NAME = (
|
||||
"calenviroscreen_priority_community"
|
||||
)
|
||||
|
|
|
@ -2,6 +2,7 @@ import csv
|
|||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
import geopandas as gpd
|
||||
|
||||
from .etl_utils import get_state_fips_codes
|
||||
from utils import unzip_file_from_url, get_module_logger
|
||||
|
@ -11,7 +12,7 @@ logger = get_module_logger(__name__)
|
|||
|
||||
def download_census_csvs(data_path: Path) -> None:
|
||||
"""Download all census shape files from the Census FTP and extract the geojson
|
||||
to generate national and by state Census Block Group CSVs
|
||||
to generate national and by state Census Block Group CSVs and GeoJSONs
|
||||
|
||||
Args:
|
||||
data_path (pathlib.Path): Name of the directory where the files and directories will
|
||||
|
@ -108,4 +109,17 @@ def download_census_csvs(data_path: Path) -> None:
|
|||
]
|
||||
)
|
||||
|
||||
## create national geojson
|
||||
logger.info(f"Generating national geojson file")
|
||||
usa_df = gpd.GeoDataFrame()
|
||||
|
||||
for file_name in geojson_dir_path.rglob("*.json"):
|
||||
logger.info(f"Ingesting {file_name}")
|
||||
state_gdf = gpd.read_file(file_name)
|
||||
usa_df = usa_df.append(state_gdf)
|
||||
|
||||
usa_df = usa_df.to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")
|
||||
logger.info(f"Writing national geojson file")
|
||||
usa_df.to_file(geojson_dir_path / "us.json", driver="GeoJSON")
|
||||
|
||||
logger.info("Census block groups downloading complete")
|
||||
|
|
|
@ -106,3 +106,8 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
self.df[columns_to_include].to_csv(
|
||||
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
|
||||
)
|
||||
|
||||
def validate(self) -> None:
|
||||
logger.info(f"Validating Census ACS Data")
|
||||
|
||||
pass
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue