mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-24 18:44:20 -08:00
* Add spatial join method (#1871) Since we'll need to figure out the tracts for a large number of points in future tickets, add a utility to handle grabbing the tract geometries and adding tract data to a point dataset. * Add FUDS, also jupyter lab (#1871) * Add YAML configs for FUDS (#1871) * Allow input geoid to be optional (#1871) * Add FUDS ETL, tests, test-datae noteobook (#1871) This adds the ETL class for Formerly Used Defense Sites (FUDS). This is different from most other ETLs since these FUDS are not provided by tract, but instead by geographic point, so we need to assign FUDS to tracts and then do calculations from there. * Floats -> Ints, as I intended (#1871) * Floats -> Ints, as I intended (#1871) * Formatting fixes (#1871) * Add test false positive GEOIDs (#1871) * Add gdal binaries (#1871) * Refactor pandas code to be more idiomatic (#1871) Per Emma, the more pandas-y way of doing my counts is using np.where to add the values i need, then groupby and size. It is definitely more compact, and also I think more correct! * Update configs per Emma suggestions (#1871) * Type fixed! (#1871) * Remove spurious import from vscode (#1871) * Snapshot update after changing col name (#1871) * Move up GDAL (#1871) * Adjust geojson strategy (#1871) * Try running census separately first (#1871) * Fix import order (#1871) * Cleanup cache strategy (#1871) * Download census data from S3 instead of re-calculating (#1871) * Clarify pandas code per Emma (#1871)
241 lines
8.2 KiB
Python
241 lines
8.2 KiB
Python
import csv
|
|
import json
|
|
import subprocess
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
import geopandas as gpd
|
|
|
|
from data_pipeline.etl.base import ExtractTransformLoad
|
|
from data_pipeline.utils import get_module_logger, unzip_file_from_url
|
|
|
|
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
|
|
|
|
logger = get_module_logger(__name__)
|
|
|
|
|
|
class GeoFileType(Enum):
|
|
SHP = 1
|
|
GEOJSON = 2
|
|
CSV = 3
|
|
|
|
|
|
class CensusETL(ExtractTransformLoad):
|
|
SHP_BASE_PATH = ExtractTransformLoad.DATA_PATH / "census" / "shp"
|
|
GEOJSON_BASE_PATH = ExtractTransformLoad.DATA_PATH / "census" / "geojson"
|
|
CSV_BASE_PATH = ExtractTransformLoad.DATA_PATH / "census" / "csv"
|
|
GEOJSON_PATH = ExtractTransformLoad.DATA_PATH / "census" / "geojson"
|
|
NATIONAL_TRACT_CSV_PATH = CSV_BASE_PATH / "us.csv"
|
|
NATIONAL_TRACT_JSON_PATH = GEOJSON_BASE_PATH / "us.json"
|
|
GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT"
|
|
|
|
def __init__(self):
|
|
# the fips_states_2010.csv is generated from data here
|
|
# https://www.census.gov/geographies/reference-files/time-series/geo/tallies.html
|
|
self.STATE_FIPS_CODES = get_state_fips_codes(self.DATA_PATH)
|
|
self.TRACT_PER_STATE: dict = {} # in-memory dict per state
|
|
self.TRACT_NATIONAL: list = [] # in-memory global list
|
|
|
|
def _path_for_fips_file(
|
|
self, fips_code: str, file_type: GeoFileType
|
|
) -> Path:
|
|
"""Get paths for associated geospatial files for the provided FIPS code
|
|
|
|
Args:
|
|
fips_code (str): the FIPS code for the region of interest
|
|
file_type (GeoFileType): the geo file type of interest
|
|
|
|
Returns:
|
|
Path on disk to the file_type file corresponding to this FIPS
|
|
"""
|
|
file_path: Path
|
|
if file_type == GeoFileType.SHP:
|
|
file_path = Path(
|
|
self.SHP_BASE_PATH
|
|
/ fips_code
|
|
/ f"tl_2010_{fips_code}_tract10.shp"
|
|
)
|
|
elif file_type == GeoFileType.GEOJSON:
|
|
file_path = Path(self.GEOJSON_BASE_PATH / f"{fips_code}.json")
|
|
elif file_type == GeoFileType.CSV:
|
|
file_path = Path(self.CSV_BASE_PATH / f"{fips_code}.csv")
|
|
return file_path
|
|
|
|
def _extract_shp(self, fips_code: str) -> None:
|
|
"""Download the SHP file for the provided FIPS code
|
|
|
|
Args:
|
|
fips_code (str): the FIPS code for the region of interest
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
shp_file_path = self._path_for_fips_file(fips_code, GeoFileType.SHP)
|
|
logger.info(f"Checking if {fips_code} shp file exists")
|
|
|
|
# check if file exists
|
|
if not shp_file_path.is_file():
|
|
logger.info(
|
|
f"{fips_code} shp file does not exist. Downloading and extracting shape file"
|
|
)
|
|
|
|
tract_state_url = f"https://www2.census.gov/geo/tiger/TIGER2010/TRACT/2010/tl_2010_{fips_code}_tract10.zip"
|
|
unzip_file_from_url(
|
|
tract_state_url,
|
|
self.TMP_PATH,
|
|
self.DATA_PATH / "census" / "shp" / fips_code,
|
|
)
|
|
|
|
def extract(self) -> None:
|
|
logger.info("Downloading Census Data")
|
|
for fips_code in self.STATE_FIPS_CODES:
|
|
self._extract_shp(fips_code)
|
|
|
|
def _transform_to_geojson(self, fips_code: str) -> None:
|
|
"""Convert the downloaded SHP file for the associated FIPS to geojson
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
shp_file_path = self._path_for_fips_file(fips_code, GeoFileType.SHP)
|
|
geojson_file_path = self._path_for_fips_file(
|
|
fips_code, GeoFileType.GEOJSON
|
|
)
|
|
logger.info(f"Checking if {fips_code} geoJSON file exists ")
|
|
if not geojson_file_path.is_file():
|
|
logger.info(
|
|
f"GeoJSON file {fips_code} does not exist. Converting shp to geoJSON"
|
|
)
|
|
cmd = [
|
|
"ogr2ogr",
|
|
"-f",
|
|
"GeoJSON",
|
|
str(geojson_file_path),
|
|
str(shp_file_path),
|
|
]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
def _generate_tract_table(self) -> None:
|
|
"""Generate Tract CSV table for pandas, load in memory
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
for file in self.GEOJSON_BASE_PATH.iterdir():
|
|
if file.suffix == ".json":
|
|
logger.info(f"Ingesting geoid10 for file {file}")
|
|
with open(self.GEOJSON_BASE_PATH / file, encoding="utf-8") as f:
|
|
geojson = json.load(f)
|
|
for feature in geojson["features"]:
|
|
tractid10 = feature["properties"]["GEOID10"]
|
|
self.TRACT_NATIONAL.append(str(tractid10))
|
|
tractid10_state_id = tractid10[:2]
|
|
if not self.TRACT_PER_STATE.get(tractid10_state_id):
|
|
self.TRACT_PER_STATE[tractid10_state_id] = []
|
|
self.TRACT_PER_STATE[tractid10_state_id].append(
|
|
tractid10
|
|
)
|
|
|
|
def transform(self) -> None:
|
|
"""Download all census shape files from the Census FTP and extract the geojson
|
|
to generate national and by state Census tract CSVs and GeoJSONs
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
logger.info("Transforming Census Data")
|
|
for fips_code in self.STATE_FIPS_CODES:
|
|
self._transform_to_geojson(fips_code)
|
|
self._generate_tract_table()
|
|
|
|
def _load_into_state_csvs(self, fips_code: str) -> None:
|
|
"""Load state CSVS into individual CSV files
|
|
|
|
Args:
|
|
fips_code (str): the FIPS code for the region of interest
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
## write to individual state csv
|
|
tractid10_list = self.TRACT_PER_STATE[fips_code]
|
|
csv_path = self._path_for_fips_file(fips_code, GeoFileType.CSV)
|
|
with open(
|
|
csv_path, mode="w", newline="", encoding="utf-8"
|
|
) as cbg_csv_file:
|
|
tract_csv_file_writer = csv.writer(
|
|
cbg_csv_file,
|
|
delimiter=",",
|
|
quotechar='"',
|
|
quoting=csv.QUOTE_MINIMAL,
|
|
)
|
|
|
|
for tractid10 in tractid10_list:
|
|
tract_csv_file_writer.writerow(
|
|
[
|
|
tractid10,
|
|
]
|
|
)
|
|
|
|
def _load_national_csv(self):
|
|
"""Write national-level csv combining
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
logger.info("Writing national us.csv file")
|
|
|
|
if not self.NATIONAL_TRACT_CSV_PATH.is_file():
|
|
logger.info(f"Creating {self.NATIONAL_TRACT_CSV_PATH}")
|
|
with open(
|
|
self.NATIONAL_TRACT_CSV_PATH,
|
|
mode="w",
|
|
newline="",
|
|
encoding="utf-8",
|
|
) as cbg_csv_file:
|
|
cbg_csv_file_writer = csv.writer(
|
|
cbg_csv_file,
|
|
delimiter=",",
|
|
quotechar='"',
|
|
quoting=csv.QUOTE_MINIMAL,
|
|
)
|
|
for geoid10 in self.TRACT_NATIONAL:
|
|
cbg_csv_file_writer.writerow(
|
|
[
|
|
geoid10,
|
|
]
|
|
)
|
|
|
|
def _load_national_geojson(self):
|
|
"""Create national geojson
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
logger.info("Generating national geojson file")
|
|
|
|
usa_df = gpd.GeoDataFrame()
|
|
|
|
for file_name in self.GEOJSON_BASE_PATH.rglob("*.json"):
|
|
logger.info(f"Ingesting {file_name}")
|
|
state_gdf = gpd.read_file(file_name)
|
|
usa_df = usa_df.append(state_gdf)
|
|
|
|
usa_df = usa_df.to_crs(
|
|
"+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs"
|
|
)
|
|
logger.info("Writing national geojson file")
|
|
usa_df.to_file(self.NATIONAL_TRACT_JSON_PATH, driver="GeoJSON")
|
|
|
|
logger.info("Census tract downloading complete")
|
|
|
|
def load(self) -> None:
|
|
"""Create state CSVs, National CSV, and National GeoJSON
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
logger.info("Saving Census CSV")
|
|
for fips_code in self.TRACT_PER_STATE:
|
|
self._load_into_state_csvs(fips_code)
|
|
self._load_national_csv()
|
|
self._load_national_geojson()
|