mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-07-29 07:41:17 -07:00
Add FUDS ETL (#1817)
* Add spatial join method (#1871) Since we'll need to figure out the tracts for a large number of points in future tickets, add a utility to handle grabbing the tract geometries and adding tract data to a point dataset. * Add FUDS, also jupyter lab (#1871) * Add YAML configs for FUDS (#1871) * Allow input geoid to be optional (#1871) * Add FUDS ETL, tests, test-datae noteobook (#1871) This adds the ETL class for Formerly Used Defense Sites (FUDS). This is different from most other ETLs since these FUDS are not provided by tract, but instead by geographic point, so we need to assign FUDS to tracts and then do calculations from there. * Floats -> Ints, as I intended (#1871) * Floats -> Ints, as I intended (#1871) * Formatting fixes (#1871) * Add test false positive GEOIDs (#1871) * Add gdal binaries (#1871) * Refactor pandas code to be more idiomatic (#1871) Per Emma, the more pandas-y way of doing my counts is using np.where to add the values i need, then groupby and size. It is definitely more compact, and also I think more correct! * Update configs per Emma suggestions (#1871) * Type fixed! (#1871) * Remove spurious import from vscode (#1871) * Snapshot update after changing col name (#1871) * Move up GDAL (#1871) * Adjust geojson strategy (#1871) * Try running census separately first (#1871) * Fix import order (#1871) * Cleanup cache strategy (#1871) * Download census data from S3 instead of re-calculating (#1871) * Clarify pandas code per Emma (#1871)
This commit is contained in:
parent
13e79087d1
commit
d5fbb802e8
22 changed files with 2534 additions and 416 deletions
|
@ -127,9 +127,10 @@ class ExtractTransformLoad:
|
|||
sys.exit()
|
||||
|
||||
# set some of the basic fields
|
||||
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
|
||||
"input_geoid_tract_field_name"
|
||||
]
|
||||
if "input_geoid_tract_field_name" in dataset_config:
|
||||
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
|
||||
"input_geoid_tract_field_name"
|
||||
]
|
||||
|
||||
# get the columns to write on the CSV
|
||||
# and set the constants
|
||||
|
|
|
@ -130,6 +130,11 @@ DATASET_LIST = [
|
|||
"module_dir": "census_acs_2010",
|
||||
"class_name": "CensusACS2010ETL",
|
||||
},
|
||||
{
|
||||
"name": "us_army_fuds",
|
||||
"module_dir": "us_army_fuds",
|
||||
"class_name": "USArmyFUDS",
|
||||
},
|
||||
]
|
||||
|
||||
CENSUS_INFO = {
|
||||
|
|
|
@ -117,6 +117,34 @@ datasets:
|
|||
field_type: float
|
||||
include_in_downloadable_files: true
|
||||
include_in_tiles: true
|
||||
- long_name: "Formerly Used Defense Sites"
|
||||
short_name: "FUDS"
|
||||
module_name: "us_army_fuds"
|
||||
load_fields:
|
||||
- short_name: "fuds_count"
|
||||
df_field_name: "ELIGIBLE_FUDS_COUNT_FIELD_NAME"
|
||||
long_name: "Count of eligible Formerly Used Defense Site (FUDS) properties centroids"
|
||||
description_short:
|
||||
"The number of FUDS marked as Eligible and Has Project in the tract."
|
||||
field_type: int64
|
||||
include_in_tiles: false
|
||||
include_in_downloadable_files: false
|
||||
- short_name: "not_fuds_ct"
|
||||
df_field_name: "INELIGIBLE_FUDS_COUNT_FIELD_NAME"
|
||||
long_name: "Count of ineligible Formerly Used Defense Site (FUDS) properties centroids"
|
||||
description_short:
|
||||
"The number of FUDS marked as Ineligible or Project in the tract."
|
||||
field_type: int64
|
||||
include_in_tiles: false
|
||||
include_in_downloadable_files: false
|
||||
- short_name: "has_fuds"
|
||||
df_field_name: "ELIGIBLE_FUDS_BINARY_FIELD_NAME"
|
||||
long_name: "Is there at least one Formerly Used Defense Site (FUDS) in the tract?"
|
||||
description_short:
|
||||
"Whether the tract has a FUDS"
|
||||
field_type: bool
|
||||
include_in_tiles: false
|
||||
include_in_downloadable_files: false
|
||||
- long_name: "Example ETL"
|
||||
short_name: "Example"
|
||||
module_name: "example_dataset"
|
||||
|
@ -128,4 +156,3 @@ datasets:
|
|||
field_type: float
|
||||
include_in_tiles: true
|
||||
include_in_downloadable_files: true
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ class DatasetsConfig:
|
|||
long_name: str
|
||||
short_name: str
|
||||
module_name: str
|
||||
input_geoid_tract_field_name: str
|
||||
load_fields: List[LoadField]
|
||||
input_geoid_tract_field_name: Optional[str] = None
|
||||
|
||||
datasets: List[Dataset]
|
||||
|
|
|
@ -20,19 +20,20 @@ class GeoFileType(Enum):
|
|||
|
||||
|
||||
class CensusETL(ExtractTransformLoad):
|
||||
SHP_BASE_PATH = ExtractTransformLoad.DATA_PATH / "census" / "shp"
|
||||
GEOJSON_BASE_PATH = ExtractTransformLoad.DATA_PATH / "census" / "geojson"
|
||||
CSV_BASE_PATH = ExtractTransformLoad.DATA_PATH / "census" / "csv"
|
||||
GEOJSON_PATH = ExtractTransformLoad.DATA_PATH / "census" / "geojson"
|
||||
NATIONAL_TRACT_CSV_PATH = CSV_BASE_PATH / "us.csv"
|
||||
NATIONAL_TRACT_JSON_PATH = GEOJSON_BASE_PATH / "us.json"
|
||||
GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT"
|
||||
|
||||
def __init__(self):
|
||||
self.SHP_BASE_PATH = self.DATA_PATH / "census" / "shp"
|
||||
self.GEOJSON_BASE_PATH = self.DATA_PATH / "census" / "geojson"
|
||||
self.CSV_BASE_PATH = self.DATA_PATH / "census" / "csv"
|
||||
# the fips_states_2010.csv is generated from data here
|
||||
# https://www.census.gov/geographies/reference-files/time-series/geo/tallies.html
|
||||
self.STATE_FIPS_CODES = get_state_fips_codes(self.DATA_PATH)
|
||||
self.GEOJSON_PATH = self.DATA_PATH / "census" / "geojson"
|
||||
self.TRACT_PER_STATE: dict = {} # in-memory dict per state
|
||||
self.TRACT_NATIONAL: list = [] # in-memory global list
|
||||
self.NATIONAL_TRACT_CSV_PATH = self.CSV_BASE_PATH / "us.csv"
|
||||
self.NATIONAL_TRACT_JSON_PATH = self.GEOJSON_BASE_PATH / "us.json"
|
||||
self.GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT"
|
||||
|
||||
def _path_for_fips_file(
|
||||
self, fips_code: str, file_type: GeoFileType
|
||||
|
|
62
data/data-pipeline/data_pipeline/etl/sources/geo_utils.py
Normal file
62
data/data-pipeline/data_pipeline/etl/sources/geo_utils.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
"""Utililities for turning geographies into tracts, using census data"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from functools import lru_cache
|
||||
import geopandas as gpd
|
||||
from data_pipeline.utils import get_module_logger
|
||||
from .census.etl import CensusETL
|
||||
|
||||
logger = get_module_logger(__name__)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_tract_geojson(
|
||||
_tract_data_path: Optional[Path] = None,
|
||||
) -> gpd.GeoDataFrame:
|
||||
logger.info("Loading tract geometry data from census ETL")
|
||||
GEOJSON_PATH = _tract_data_path
|
||||
if GEOJSON_PATH is None:
|
||||
GEOJSON_PATH = CensusETL.NATIONAL_TRACT_JSON_PATH
|
||||
if not GEOJSON_PATH.exists():
|
||||
logger.debug("Census data has not been computed, running")
|
||||
census_etl = CensusETL()
|
||||
census_etl.extract()
|
||||
census_etl.transform()
|
||||
census_etl.load()
|
||||
else:
|
||||
logger.debug("Loading existing tract geojson")
|
||||
tract_data = gpd.read_file(GEOJSON_PATH, include_fields=["GEOID10"])
|
||||
tract_data.rename(columns={"GEOID10": "GEOID10_TRACT"}, inplace=True)
|
||||
return tract_data
|
||||
|
||||
|
||||
def add_tracts_for_geometries(
|
||||
df: gpd.GeoDataFrame, _tract_data_path: Optional[Path] = None
|
||||
) -> gpd.GeoDataFrame:
|
||||
"""Adds tract-geoids to dataframe df that contains spatial geometries
|
||||
|
||||
Depends on CensusETL for the geodata to do its conversion
|
||||
|
||||
Args:
|
||||
df (GeoDataFrame): a geopandas GeoDataFrame with a point geometry column
|
||||
_tract_data_path (Path): an override to directly pass a GEOJSON file of
|
||||
tracts->Geometries, to simplify testing.
|
||||
|
||||
Returns:
|
||||
GeoDataFrame: the above dataframe, with an additional GEOID10_TRACT column that
|
||||
maps the points in DF to census tracts and a geometry column for later
|
||||
spatial analysis
|
||||
"""
|
||||
logger.debug("Appending tract data to dataframe")
|
||||
tract_data = get_tract_geojson(_tract_data_path)
|
||||
assert (
|
||||
tract_data.crs == df.crs
|
||||
), f"Dataframe must be projected to {tract_data.crs}"
|
||||
df = gpd.sjoin(
|
||||
df,
|
||||
tract_data[["GEOID10_TRACT", "geometry"]],
|
||||
how="inner",
|
||||
op="intersects",
|
||||
)
|
||||
return df
|
|
@ -0,0 +1,98 @@
|
|||
from pathlib import Path
|
||||
import geopandas as gpd
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
|
||||
from data_pipeline.utils import get_module_logger, download_file_from_url
|
||||
from data_pipeline.etl.sources.geo_utils import add_tracts_for_geometries
|
||||
|
||||
logger = get_module_logger(__name__)
|
||||
|
||||
|
||||
class USArmyFUDS(ExtractTransformLoad):
|
||||
"""The Formerly Used Defense Sites (FUDS)"""
|
||||
|
||||
NAME: str = "us_army_fuds"
|
||||
|
||||
ELIGIBLE_FUDS_COUNT_FIELD_NAME: str
|
||||
INELIGIBLE_FUDS_COUNT_FIELD_NAME: str
|
||||
ELIGIBLE_FUDS_BINARY_FIELD_NAME: str
|
||||
GEO_LEVEL: ValidGeoLevel = ValidGeoLevel.CENSUS_TRACT
|
||||
|
||||
def __init__(self):
|
||||
self.FILE_URL: str = (
|
||||
"https://opendata.arcgis.com/api/v3/datasets/"
|
||||
"3f8354667d5b4b1b8ad7a6e00c3cf3b1_1/downloads/"
|
||||
"data?format=geojson&spatialRefId=4326&where=1%3D1"
|
||||
)
|
||||
|
||||
self.OUTPUT_PATH: Path = self.DATA_PATH / "dataset" / "us_army_fuds"
|
||||
|
||||
# Constants for output
|
||||
self.COLUMNS_TO_KEEP = [
|
||||
self.GEOID_TRACT_FIELD_NAME,
|
||||
self.ELIGIBLE_FUDS_COUNT_FIELD_NAME,
|
||||
self.INELIGIBLE_FUDS_COUNT_FIELD_NAME,
|
||||
self.ELIGIBLE_FUDS_BINARY_FIELD_NAME,
|
||||
]
|
||||
self.DOWNLOAD_FILE_NAME = self.get_tmp_path() / "fuds.geojson"
|
||||
|
||||
self.raw_df: gpd.GeoDataFrame
|
||||
self.output_df: pd.DataFrame
|
||||
|
||||
def extract(self) -> None:
|
||||
logger.info("Starting FUDS data download.")
|
||||
|
||||
download_file_from_url(
|
||||
file_url=self.FILE_URL,
|
||||
download_file_name=self.DOWNLOAD_FILE_NAME,
|
||||
verify=True,
|
||||
)
|
||||
|
||||
def transform(self) -> None:
|
||||
logger.info("Starting FUDS transform.")
|
||||
# before we try to do any transformation, get the tract data
|
||||
# so it's loaded and the census ETL is out of scope
|
||||
|
||||
logger.info("Loading FUDs data as GeoDataFrame for transform")
|
||||
raw_df = gpd.read_file(
|
||||
filename=self.DOWNLOAD_FILE_NAME,
|
||||
low_memory=False,
|
||||
)
|
||||
|
||||
# Note that the length of raw_df will not be exactly the same
|
||||
# because same bases lack coordinated or have coordinates in
|
||||
# Mexico or in the ocean. See the following dataframe:
|
||||
# raw_df[~raw_df.OBJECTID.isin(df_with_tracts.OBJECTID)][
|
||||
# ['OBJECTID', 'CLOSESTCITY', 'COUNTY', 'ELIGIBILITY',
|
||||
# 'STATE', 'LATITUDE', "LONGITUDE"]]
|
||||
logger.debug("Adding tracts to FUDS data")
|
||||
df_with_tracts = add_tracts_for_geometries(raw_df)
|
||||
self.output_df = pd.DataFrame()
|
||||
|
||||
# this will create a boolean series which you can do actually sans np.where
|
||||
df_with_tracts["tmp_fuds"] = (
|
||||
df_with_tracts.ELIGIBILITY == "Eligible"
|
||||
) & (df_with_tracts.HASPROJECTS == "Yes")
|
||||
|
||||
self.output_df[
|
||||
self.ELIGIBLE_FUDS_COUNT_FIELD_NAME
|
||||
] = df_with_tracts.groupby(self.GEOID_TRACT_FIELD_NAME)[
|
||||
"tmp_fuds"
|
||||
].sum()
|
||||
|
||||
self.output_df[self.INELIGIBLE_FUDS_COUNT_FIELD_NAME] = (
|
||||
df_with_tracts[~df_with_tracts.tmp_fuds]
|
||||
.groupby(self.GEOID_TRACT_FIELD_NAME)
|
||||
.size()
|
||||
)
|
||||
self.output_df = (
|
||||
self.output_df.fillna(0).astype("int64").sort_index().reset_index()
|
||||
)
|
||||
|
||||
self.output_df[self.ELIGIBLE_FUDS_BINARY_FIELD_NAME] = np.where(
|
||||
self.output_df[self.ELIGIBLE_FUDS_COUNT_FIELD_NAME] > 0.0,
|
||||
True,
|
||||
False,
|
||||
)
|
Loading…
Add table
Add a link
Reference in a new issue