Data Pipeline performance improvements for Census GeoJson and Score file

This commit is contained in:
Carlos Felix 2025-01-13 09:28:14 -05:00 committed by Carlos Felix
commit c32bd1f363
37 changed files with 1305 additions and 1413 deletions

View file

@ -155,7 +155,13 @@ DATASET_LIST = [
"class_name": "HistoricRedliningETL",
"is_memory_intensive": False,
},
# This has to come after us.json exists
{
"name": "tribal",
"module_dir": "tribal",
"class_name": "TribalETL",
"is_memory_intensive": False,
},
# This has to come after us_geo.parquet exists
{
"name": "census_acs",
"module_dir": "census_acs",
@ -196,10 +202,3 @@ CENSUS_INFO = {
"class_name": "CensusETL",
"is_memory_intensive": False,
}
TRIBAL_INFO = {
"name": "tribal",
"module_dir": "tribal",
"class_name": "TribalETL",
"is_memory_intensive": False,
}

View file

@ -1,5 +1,6 @@
import concurrent.futures
import importlib
import time
import typing
import os
@ -27,9 +28,7 @@ def _get_datasets_to_run(dataset_to_run: str) -> typing.List[dict]:
None
"""
dataset_list = constants.DATASET_LIST
etls_to_search = (
dataset_list + [constants.CENSUS_INFO] + [constants.TRIBAL_INFO]
)
etls_to_search = dataset_list + [constants.CENSUS_INFO]
if dataset_to_run:
dataset_element = next(
@ -59,6 +58,8 @@ def _get_dataset(dataset: dict) -> ExtractTransformLoad:
def _run_one_dataset(dataset: dict, use_cache: bool = False) -> None:
"""Runs one etl process."""
start_time = time.time()
logger.info(f"Running ETL for {dataset['name']}")
etl_instance = _get_dataset(dataset)
@ -83,6 +84,9 @@ def _run_one_dataset(dataset: dict, use_cache: bool = False) -> None:
etl_instance.cleanup()
logger.info(f"Finished ETL for dataset {dataset['name']}")
logger.debug(
f"Execution time for ETL for dataset {dataset['name']} was {time.time() - start_time}s"
)
def etl_runner(
@ -197,10 +201,14 @@ def score_generate() -> None:
"""
# Score Gen
start_time = time.time()
score_gen = ScoreETL()
score_gen.extract()
score_gen.transform()
score_gen.load()
logger.debug(
f"Execution time for Score Generation was {time.time() - start_time}s"
)
def score_post(data_source: str = "local") -> None:
@ -216,11 +224,15 @@ def score_post(data_source: str = "local") -> None:
None
"""
# Post Score Processing
start_time = time.time()
score_post = PostScoreETL(data_source=data_source)
score_post.extract()
score_post.transform()
score_post.load()
score_post.cleanup()
logger.debug(
f"Execution time for Score Post was {time.time() - start_time}s"
)
def score_geo(data_source: str = "local") -> None:
@ -237,10 +249,14 @@ def score_geo(data_source: str = "local") -> None:
"""
# Score Geo
start_time = time.time()
score_geo = GeoScoreETL(data_source=data_source)
score_geo.extract()
score_geo.transform()
score_geo.load()
logger.debug(
f"Execution time for Score Geo was {time.time() - start_time}s"
)
def _find_dataset_index(dataset_list, key, value):

View file

@ -24,7 +24,7 @@ DATA_CENSUS_DIR = DATA_PATH / "census"
DATA_CENSUS_CSV_DIR = DATA_CENSUS_DIR / "csv"
DATA_CENSUS_CSV_FILE_PATH = DATA_CENSUS_CSV_DIR / "us.csv"
DATA_CENSUS_CSV_STATE_FILE_PATH = DATA_CENSUS_CSV_DIR / "fips_states_2010.csv"
DATA_CENSUS_GEOJSON_FILE_PATH = DATA_CENSUS_DIR / "geojson" / "us.json"
DATA_CENSUS_GEOJSON_FILE_PATH = DATA_CENSUS_DIR / "geojson" / "us_geo.parquet"
# Score paths
DATA_SCORE_DIR = DATA_PATH / "score"
@ -32,7 +32,7 @@ DATA_SCORE_DIR = DATA_PATH / "score"
## Score CSV Paths
DATA_SCORE_CSV_DIR = DATA_SCORE_DIR / "csv"
DATA_SCORE_CSV_FULL_DIR = DATA_SCORE_CSV_DIR / "full"
DATA_SCORE_CSV_FULL_FILE_PATH = DATA_SCORE_CSV_FULL_DIR / "usa.csv"
DATA_SCORE_CSV_FULL_FILE_PATH = DATA_SCORE_CSV_FULL_DIR / "usa_score.parquet"
FULL_SCORE_CSV_FULL_PLUS_COUNTIES_FILE_PATH = (
DATA_SCORE_CSV_FULL_DIR / "usa_counties.csv"
)

View file

@ -727,4 +727,4 @@ class ScoreETL(ExtractTransformLoad):
def load(self) -> None:
constants.DATA_SCORE_CSV_FULL_DIR.mkdir(parents=True, exist_ok=True)
self.df.to_csv(constants.DATA_SCORE_CSV_FULL_FILE_PATH, index=False)
self.df.to_parquet(constants.DATA_SCORE_CSV_FULL_FILE_PATH, index=False)

View file

@ -37,9 +37,7 @@ class GeoScoreETL(ExtractTransformLoad):
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
self.TILE_SCORE_CSV = self.SCORE_CSV_PATH / "tiles" / "usa.csv"
self.CENSUS_USA_GEOJSON = (
self.DATA_PATH / "census" / "geojson" / "us.json"
)
self.CENSUS_USA_GEOJSON = constants.DATA_CENSUS_GEOJSON_FILE_PATH
# Import the shortened name for Score N to be used on tiles.
# We should no longer be using PFS
@ -87,16 +85,14 @@ class GeoScoreETL(ExtractTransformLoad):
score_data_source=self.DATA_SOURCE,
)
logger.info("Reading US GeoJSON (~6 minutes)")
full_geojson_usa_df = gpd.read_file(
logger.info("Reading US GeoJSON")
full_geojson_usa_df = gpd.read_parquet(
self.CENSUS_USA_GEOJSON,
dtype={self.GEOID_FIELD_NAME: "string"},
usecols=[
columns=[
self.GEOID_FIELD_NAME,
self.GEOMETRY_FIELD_NAME,
self.LAND_FIELD_NAME,
],
low_memory=False,
)
# We only want to keep tracts to visualize that have non-0 land
@ -104,7 +100,7 @@ class GeoScoreETL(ExtractTransformLoad):
full_geojson_usa_df[self.LAND_FIELD_NAME] > 0
]
logger.info("Reading score CSV")
logger.info("Reading tile score CSV")
self.score_usa_df = pd.read_csv(
self.TILE_SCORE_CSV,
dtype={

View file

@ -94,12 +94,8 @@ class PostScoreETL(ExtractTransformLoad):
)
def _extract_score(self, score_path: Path) -> pd.DataFrame:
logger.debug("Reading Score CSV")
df = pd.read_csv(
score_path,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
logger.debug("Reading Score")
df = pd.read_parquet(score_path)
# Convert total population to an int
df["Total population"] = df["Total population"].astype(
@ -116,8 +112,7 @@ class PostScoreETL(ExtractTransformLoad):
gpd.GeoDataFrame: the census geo json data
"""
logger.debug("Reading Census GeoJSON")
with open(geo_path, "r", encoding="utf-8") as file:
data = gpd.read_file(file)
data = gpd.read_parquet(geo_path)
return data
def extract(self, use_cached_data_sources: bool = False) -> None:

View file

@ -70,7 +70,7 @@ def state_data_initial(sample_data_dir):
@pytest.fixture()
def score_data_initial(sample_data_dir):
return sample_data_dir / "score_data_initial.csv"
return sample_data_dir / "score_data_initial.parquet"
@pytest.fixture()
@ -104,8 +104,8 @@ def states_transformed_expected():
@pytest.fixture()
def score_transformed_expected():
return pd.read_pickle(
pytest.SNAPSHOT_DIR / "score_transformed_expected.pkl"
return pd.read_parquet(
pytest.SNAPSHOT_DIR / "score_transformed_expected.parquet"
)
@ -122,7 +122,7 @@ def national_tract_df():
@pytest.fixture()
def score_data_expected():
return pd.read_pickle(pytest.SNAPSHOT_DIR / "score_data_expected.pkl")
return pd.read_parquet(pytest.SNAPSHOT_DIR / "score_data_expected.parquet")
@pytest.fixture()
@ -144,8 +144,8 @@ def create_tile_data_expected():
@pytest.fixture()
def downloadable_data_expected():
return pd.read_pickle(
pytest.SNAPSHOT_DIR / "downloadable_data_expected.pkl"
return pd.read_parquet(
pytest.SNAPSHOT_DIR / "downloadable_data_expected.parquet"
)

File diff suppressed because one or more lines are too long

View file

@ -33,8 +33,7 @@ def test_extract_states(etl, state_data_initial):
def test_extract_score(etl, score_data_initial):
extracted = etl._extract_score(score_data_initial)
string_cols = ["GEOID10_TRACT"]
assert all(ptypes.is_string_dtype(extracted[col]) for col in string_cols)
assert len(extracted) > 0
# Transform Tests
@ -107,6 +106,7 @@ def test_create_downloadable_data(
pdt.assert_frame_equal(
output_downloadable_df_actual,
downloadable_data_expected,
check_dtype=False,
)

View file

@ -1,10 +1,9 @@
import csv
import json
import subprocess
from enum import Enum
from pathlib import Path
import geopandas as gpd
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
from data_pipeline.utils import get_module_logger
@ -26,8 +25,8 @@ class CensusETL(ExtractTransformLoad):
CSV_BASE_PATH = ExtractTransformLoad.DATA_PATH / "census" / "csv"
GEOJSON_PATH = ExtractTransformLoad.DATA_PATH / "census" / "geojson"
NATIONAL_TRACT_CSV_PATH = CSV_BASE_PATH / "us.csv"
NATIONAL_TRACT_JSON_PATH = GEOJSON_BASE_PATH / "us.json"
GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT"
NATIONAL_TRACT_JSON_PATH = GEOJSON_BASE_PATH / "us_geo.parquet"
GEOID_TRACT_FIELD_NAME: str = "GEOID10"
def __init__(self):
@ -59,7 +58,7 @@ class CensusETL(ExtractTransformLoad):
/ f"tl_2010_{fips_code}_tract10.shp"
)
elif file_type == GeoFileType.GEOJSON:
file_path = Path(self.GEOJSON_BASE_PATH / f"{fips_code}.json")
file_path = Path(self.GEOJSON_BASE_PATH / f"{fips_code}.parquet")
elif file_type == GeoFileType.CSV:
file_path = Path(self.CSV_BASE_PATH / f"{fips_code}.csv")
return file_path
@ -93,14 +92,8 @@ class CensusETL(ExtractTransformLoad):
)
if not geojson_file_path.is_file():
cmd = [
"ogr2ogr",
"-f",
"GeoJSON",
str(geojson_file_path),
str(shp_file_path),
]
subprocess.run(cmd, check=True)
gdf = gpd.read_file(shp_file_path)
gdf.to_parquet(geojson_file_path)
def _generate_tract_table(self) -> None:
"""Generate Tract CSV table for pandas, load in memory
@ -110,20 +103,15 @@ class CensusETL(ExtractTransformLoad):
"""
logger.debug("Transforming tracts")
for file in self.GEOJSON_BASE_PATH.iterdir():
if file.suffix == ".json":
logger.debug(f"Adding GEOID10 for file {file.name}")
with open(self.GEOJSON_BASE_PATH / file, encoding="utf-8") as f:
geojson = json.load(f)
for feature in geojson["features"]:
tractid10 = feature["properties"]["GEOID10"]
self.TRACT_NATIONAL.append(str(tractid10))
tractid10_state_id = tractid10[:2]
if not self.TRACT_PER_STATE.get(tractid10_state_id):
self.TRACT_PER_STATE[tractid10_state_id] = []
self.TRACT_PER_STATE[tractid10_state_id].append(
tractid10
)
files = list(self.GEOJSON_BASE_PATH.glob("[0-9]*.parquet"))
files.sort()
for file in files:
logger.debug(f"Adding GEOID10 for file {file.name}")
state_df = gpd.read_parquet(file)
tract_list = state_df["GEOID10"].to_list()
self.TRACT_NATIONAL.extend(tract_list)
tractid10_state_id = state_df["STATEFP10"][0]
self.TRACT_PER_STATE[tractid10_state_id] = tract_list
def transform(self) -> None:
"""Download all census shape files from the Census FTP and extract the geojson
@ -210,18 +198,24 @@ class CensusETL(ExtractTransformLoad):
usa_df = gpd.GeoDataFrame()
for file_name in self.GEOJSON_BASE_PATH.rglob("*.json"):
# Read state only files and append them into a MEGA US GPD
files = list(self.GEOJSON_BASE_PATH.glob("[0-9]*.parquet"))
files.sort()
for file_name in files:
logger.debug(f"Adding national GeoJSON file {file_name.name}")
state_gdf = gpd.read_file(file_name)
usa_df = usa_df.append(state_gdf)
state_gdf = gpd.read_parquet(file_name)
usa_df = pd.concat([usa_df, state_gdf], ignore_index=True)
assert len(usa_df.columns) > 0
logger.debug("Converting to CRS")
usa_df = usa_df.to_crs(
"+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs"
)
usa_df = usa_df.to_crs("EPSG:4326")
logger.debug("Saving national GeoJSON file")
usa_df.to_file(self.NATIONAL_TRACT_JSON_PATH, driver="GeoJSON")
# Convert tract ID to a string
usa_df[self.GEOID_TRACT_FIELD_NAME] = usa_df[
self.GEOID_TRACT_FIELD_NAME
].astype(str, errors="ignore")
usa_df.to_parquet(self.NATIONAL_TRACT_JSON_PATH)
def load(self) -> None:
"""Create state CSVs, National CSV, and National GeoJSON

View file

@ -104,7 +104,7 @@ def check_census_data_source(
)
else:
# check if census data is found locally
if not os.path.isfile(census_data_path / "geojson" / "us.json"):
if not os.path.isfile(census_data_path / "geojson" / "us_geo.parquet"):
logger.error(
"No local census data found. Please use '-s aws` to fetch from AWS"
)

View file

@ -507,7 +507,7 @@ class CensusACSETL(ExtractTransformLoad):
# geojson file for all of the US, this will read it off of S3
logger.debug("Reading in geojson for the country")
if not os.path.exists(
self.DATA_PATH / "census" / "geojson" / "us.json"
self.DATA_PATH / "census" / "geojson" / "us_geo.parquet"
):
logger.debug("Fetching Census data from AWS S3")
unzip_file_from_url(
@ -515,9 +515,8 @@ class CensusACSETL(ExtractTransformLoad):
self.DATA_PATH / "tmp",
self.DATA_PATH,
)
self.geo_df = gpd.read_file(
self.DATA_PATH / "census" / "geojson" / "us.json",
self.geo_df = gpd.read_parquet(
self.DATA_PATH / "census" / "geojson" / "us_geo.parquet",
)
def transform(self) -> None:

View file

@ -33,7 +33,7 @@ class CensusDecennialETL(ExtractTransformLoad):
/ f"census_decennial_{DECENNIAL_YEAR}"
)
CENSUS_GEOJSON_PATH = (
ExtractTransformLoad.DATA_PATH / "census" / "geojson" / "us.json"
ExtractTransformLoad.DATA_PATH / "census" / "geojson" / "us_geo.parquet"
)
def __get_api_url(
@ -148,7 +148,7 @@ class CensusDecennialETL(ExtractTransformLoad):
"""Impute income for both income measures."""
# Merges Census geojson to imput values from.
logger.debug(f"Reading GeoJSON from {geojson_path}")
geo_df = gpd.read_file(geojson_path)
geo_df = gpd.read_parquet(geojson_path)
self.df_all = CensusACSETL.merge_geojson(
df=self.df_all,
usa_geo_df=geo_df,

View file

@ -26,10 +26,7 @@ def get_tract_geojson(
census_etl.extract()
census_etl.transform()
census_etl.load()
tract_data = gpd.read_file(
GEOJSON_PATH,
include_fields=["GEOID10"],
)
tract_data = gpd.read_parquet(GEOJSON_PATH)
tract_data = tract_data.rename(
columns={"GEOID10": "GEOID10_TRACT"}, errors="raise"
)