Census ETL should use standard ETL form (#474)

* Fixes #473
Census ETL should use standard ETL form

* linter fixes
This commit is contained in:
Nat Hillard 2021-08-06 11:01:51 -04:00 committed by GitHub
parent ef044fa36b
commit 45a8b1c026
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 178 additions and 84 deletions

View file

@ -2,7 +2,6 @@ import click
from .config import settings from .config import settings
from .etl.runner import etl_runner, score_generate, score_geo from .etl.runner import etl_runner, score_generate, score_geo
from .etl.sources.census.etl import download_census_csvs
from .etl.sources.census.etl_utils import reset_data_directories as census_reset from .etl.sources.census.etl_utils import reset_data_directories as census_reset
from .tile.generate import generate_tiles from .tile.generate import generate_tiles
from .utils import ( from .utils import (
@ -59,7 +58,7 @@ def census_data_download():
census_reset(data_path) census_reset(data_path)
logger.info("Downloading census data") logger.info("Downloading census data")
download_census_csvs(data_path) etl_runner("census")
logger.info("Completed downloading census data") logger.info("Completed downloading census data")

View file

@ -17,6 +17,11 @@ def etl_runner(dataset_to_run: str = None) -> None:
# this list comes from YAMLs # this list comes from YAMLs
dataset_list = [ dataset_list = [
{
"name": "census",
"module_dir": "census",
"class_name": "CensusETL",
},
{ {
"name": "tree_equity_score", "name": "tree_equity_score",
"module_dir": "tree_equity_score", "module_dir": "tree_equity_score",

View file

@ -1,10 +1,11 @@
import csv import csv
import json import json
import os
from pathlib import Path
import subprocess import subprocess
from enum import Enum
from pathlib import Path
import geopandas as gpd import geopandas as gpd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger, unzip_file_from_url from data_pipeline.utils import get_module_logger, unzip_file_from_url
from .etl_utils import get_state_fips_codes from .etl_utils import get_state_fips_codes
@ -12,48 +13,91 @@ from .etl_utils import get_state_fips_codes
logger = get_module_logger(__name__) logger = get_module_logger(__name__)
def download_census_csvs(data_path: Path) -> None: class GeoFileType(Enum):
"""Download all census shape files from the Census FTP and extract the geojson SHP = 1
to generate national and by state Census Block Group CSVs and GeoJSONs GEOJSON = 2
CSV = 3
class CensusETL(ExtractTransformLoad):
def __init__(self):
self.SHP_BASE_PATH = self.DATA_PATH / "census" / "shp"
self.GEOJSON_BASE_PATH = self.DATA_PATH / "census" / "geojson"
self.CSV_BASE_PATH = self.DATA_PATH / "census" / "csv"
# the fips_states_2010.csv is generated from data here
# https://www.census.gov/geographies/reference-files/time-series/geo/tallies.html
self.STATE_FIPS_CODES = get_state_fips_codes(self.DATA_PATH)
self.GEOJSON_PATH = self.DATA_PATH / "census" / "geojson"
self.CBG_PER_STATE: dict = {} # in-memory dict per state
self.CBG_NATIONAL: list = [] # in-memory global list
self.NATIONAL_CBG_CSV_PATH = self.CSV_BASE_PATH / "us.csv"
self.NATIONAL_CBG_JSON_PATH = self.GEOJSON_BASE_PATH / "us.json"
def _path_for_fips_file(self, fips_code: str, file_type: GeoFileType) -> Path:
"""Get paths for associated geospatial files for the provided FIPS code
Args: Args:
data_path (pathlib.Path): Name of the directory where the files and directories will fips_code (str): the FIPS code for the region of interest
be created file_type (GeoFileType): the geo file type of interest
Returns:
Path on disk to the file_type file corresponding to this FIPS
"""
file_path : Path
if file_type == GeoFileType.SHP:
file_path = Path(
self.SHP_BASE_PATH / fips_code / f"tl_2010_{fips_code}_bg10.shp"
)
elif file_type == GeoFileType.GEOJSON:
file_path = Path(self.GEOJSON_BASE_PATH / f"{fips_code}.json")
elif file_type == GeoFileType.CSV:
file_path = Path(self.CSV_BASE_PATH / f"{fips_code}.csv")
return file_path
def _extract_shp(self, fips_code: str) -> None:
"""Download the SHP file for the provided FIPS code
Args:
fips_code (str): the FIPS code for the region of interest
Returns: Returns:
None None
""" """
shp_file_path = self._path_for_fips_file(fips_code, GeoFileType.SHP)
logger.info(f"Checking if {fips_code} shp file exists")
# the fips_states_2010.csv is generated from data here
# https://www.census.gov/geographies/reference-files/time-series/geo/tallies.html
state_fips_codes = get_state_fips_codes(data_path)
geojson_dir_path = data_path / "census" / "geojson"
for fips in state_fips_codes:
# check if file exists # check if file exists
shp_file_path: Path = (
data_path / "census" / "shp" / fips / f"tl_2010_{fips}_bg10.shp"
)
geojson_file_path = data_path / "census" / "geojson" / f"{fips}.json"
logger.info(f"Checking if {fips} shp file exists")
if not shp_file_path.is_file(): if not shp_file_path.is_file():
logger.info( logger.info(
f"{fips} shp file does not exist. Downloading and extracting shape file" f"{fips_code} shp file does not exist. Downloading and extracting shape file"
) )
# 2020 tiger data is here: https://www2.census.gov/geo/tiger/TIGER2020/BG/ # 2020 tiger data is here: https://www2.census.gov/geo/tiger/TIGER2020/BG/
# But using 2010 for now # But using 2010 for now
cbg_state_url = f"https://www2.census.gov/geo/tiger/TIGER2010/BG/2010/tl_2010_{fips}_bg10.zip" cbg_state_url = f"https://www2.census.gov/geo/tiger/TIGER2010/BG/2010/tl_2010_{fips_code}_bg10.zip"
unzip_file_from_url( unzip_file_from_url(
cbg_state_url, cbg_state_url,
data_path / "tmp", self.TMP_PATH,
data_path / "census" / "shp" / fips, self.DATA_PATH / "census" / "shp" / fips_code,
) )
logger.info(f"Checking if {fips} geoJSON file exists ")
def extract(self) -> None:
logger.info("Downloading Census Data")
for fips_code in self.STATE_FIPS_CODES:
self._extract_shp(fips_code)
def _transform_to_geojson(self, fips_code: str) -> None:
"""Convert the downloaded SHP file for the associated FIPS to geojson
Returns:
None
"""
shp_file_path = self._path_for_fips_file(fips_code, GeoFileType.SHP)
geojson_file_path = self._path_for_fips_file(fips_code, GeoFileType.GEOJSON)
logger.info(f"Checking if {fips_code} geoJSON file exists ")
if not geojson_file_path.is_file(): if not geojson_file_path.is_file():
logger.info( logger.info(
f"GeoJSON file {fips} does not exist. Converting shp to geoJSON" f"GeoJSON file {fips_code} does not exist. Converting shp to geoJSON"
) )
cmd = [ cmd = [
"ogr2ogr", "ogr2ogr",
@ -64,30 +108,50 @@ def download_census_csvs(data_path: Path) -> None:
] ]
subprocess.run(cmd, check=True) subprocess.run(cmd, check=True)
# generate CBG CSV table for pandas def _generate_cbg_table(self) -> None:
## load in memory """Generate CBG CSV table for pandas, load in memory
cbg_national = [] # in-memory global list
cbg_per_state: dict = {} # in-memory dict per state Returns:
for file in os.listdir(geojson_dir_path): None
if file.endswith(".json"): """
for file in self.GEOJSON_BASE_PATH.iterdir():
if file.suffix == ".json":
logger.info(f"Ingesting geoid10 for file {file}") logger.info(f"Ingesting geoid10 for file {file}")
with open(geojson_dir_path / file) as f: with open(self.GEOJSON_BASE_PATH / file) as f:
geojson = json.load(f) geojson = json.load(f)
for feature in geojson["features"]: for feature in geojson["features"]:
geoid10 = feature["properties"]["GEOID10"] geoid10 = feature["properties"]["GEOID10"]
cbg_national.append(str(geoid10)) self.CBG_NATIONAL.append(str(geoid10))
geoid10_state_id = geoid10[:2] geoid10_state_id = geoid10[:2]
if not cbg_per_state.get(geoid10_state_id): if not self.CBG_PER_STATE.get(geoid10_state_id):
cbg_per_state[geoid10_state_id] = [] self.CBG_PER_STATE[geoid10_state_id] = []
cbg_per_state[geoid10_state_id].append(geoid10) self.CBG_PER_STATE[geoid10_state_id].append(geoid10)
csv_dir_path = data_path / "census" / "csv" def transform(self) -> None:
"""Download all census shape files from the Census FTP and extract the geojson
to generate national and by state Census Block Group CSVs and GeoJSONs
Returns:
None
"""
logger.info("Transforming Census Data")
for fips_code in self.STATE_FIPS_CODES:
self._transform_to_geojson(fips_code)
self._generate_cbg_table()
def _load_into_state_csvs(self, fips_code: str) -> None:
"""Load state CSVS into individual CSV files
Args:
fips_code (str): the FIPS code for the region of interest
Returns:
None
"""
## write to individual state csv ## write to individual state csv
for state_id in cbg_per_state: geoid10_list = self.CBG_PER_STATE[fips_code]
geoid10_list = cbg_per_state[state_id] csv_path = self._path_for_fips_file(fips_code, GeoFileType.CSV)
with open( with open(csv_path, mode="w", newline="") as cbg_csv_file:
csv_dir_path / f"{state_id}.csv", mode="w", newline=""
) as cbg_csv_file:
cbg_csv_file_writer = csv.writer( cbg_csv_file_writer = csv.writer(
cbg_csv_file, cbg_csv_file,
delimiter=",", delimiter=",",
@ -102,33 +166,59 @@ def download_census_csvs(data_path: Path) -> None:
] ]
) )
## write US csv def _load_national_csv(self):
"""Write national-level csv combining
Returns:
None
"""
logger.info("Writing national us.csv file") logger.info("Writing national us.csv file")
with open(csv_dir_path / "us.csv", mode="w", newline="") as cbg_csv_file:
if not self.NATIONAL_CBG_CSV_PATH.is_file():
logger.info(f"Creating {self.NATIONAL_CBG_CSV_PATH}")
with open(self.NATIONAL_CBG_CSV_PATH, mode="w", newline="") as cbg_csv_file:
cbg_csv_file_writer = csv.writer( cbg_csv_file_writer = csv.writer(
cbg_csv_file, cbg_csv_file,
delimiter=",", delimiter=",",
quotechar='"', quotechar='"',
quoting=csv.QUOTE_MINIMAL, quoting=csv.QUOTE_MINIMAL,
) )
for geoid10 in cbg_national: for geoid10 in self.CBG_NATIONAL:
cbg_csv_file_writer.writerow( cbg_csv_file_writer.writerow(
[ [
geoid10, geoid10,
] ]
) )
## create national geojson def _load_national_geojson(self):
"""Create national geojson
Returns:
None
"""
logger.info("Generating national geojson file") logger.info("Generating national geojson file")
usa_df = gpd.GeoDataFrame() usa_df = gpd.GeoDataFrame()
for file_name in geojson_dir_path.rglob("*.json"): for file_name in self.GEOJSON_BASE_PATH.rglob("*.json"):
logger.info(f"Ingesting {file_name}") logger.info(f"Ingesting {file_name}")
state_gdf = gpd.read_file(file_name) state_gdf = gpd.read_file(file_name)
usa_df = usa_df.append(state_gdf) usa_df = usa_df.append(state_gdf)
usa_df = usa_df.to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs") usa_df = usa_df.to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")
logger.info("Writing national geojson file") logger.info("Writing national geojson file")
usa_df.to_file(geojson_dir_path / "us.json", driver="GeoJSON") usa_df.to_file(self.NATIONAL_CBG_JSON_PATH, driver="GeoJSON")
logger.info("Census block groups downloading complete") logger.info("Census block groups downloading complete")
def load(self) -> None:
"""Create state CSVs, National CSV, and National GeoJSON
Returns:
None
"""
logger.info("Saving Census CSV")
for fips_code in self.CBG_PER_STATE:
self._load_into_state_csvs(fips_code)
self._load_national_csv()
self._load_national_geojson()