diff --git a/data/data-pipeline/data_pipeline/application.py b/data/data-pipeline/data_pipeline/application.py index 19d7ed9a..35936047 100644 --- a/data/data-pipeline/data_pipeline/application.py +++ b/data/data-pipeline/data_pipeline/application.py @@ -2,7 +2,6 @@ import click from .config import settings from .etl.runner import etl_runner, score_generate, score_geo -from .etl.sources.census.etl import download_census_csvs from .etl.sources.census.etl_utils import reset_data_directories as census_reset from .tile.generate import generate_tiles from .utils import ( @@ -59,7 +58,7 @@ def census_data_download(): census_reset(data_path) logger.info("Downloading census data") - download_census_csvs(data_path) + etl_runner("census") logger.info("Completed downloading census data") diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index 093012de..ac01c3d5 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -17,6 +17,11 @@ def etl_runner(dataset_to_run: str = None) -> None: # this list comes from YAMLs dataset_list = [ + { + "name": "census", + "module_dir": "census", + "class_name": "CensusETL", + }, { "name": "tree_equity_score", "module_dir": "tree_equity_score", diff --git a/data/data-pipeline/data_pipeline/etl/sources/census/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census/etl.py index d48e367d..9b931a1b 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census/etl.py @@ -1,10 +1,11 @@ import csv import json -import os -from pathlib import Path import subprocess +from enum import Enum +from pathlib import Path import geopandas as gpd +from data_pipeline.etl.base import ExtractTransformLoad from data_pipeline.utils import get_module_logger, unzip_file_from_url from .etl_utils import get_state_fips_codes @@ -12,48 +13,91 @@ from .etl_utils import get_state_fips_codes logger = get_module_logger(__name__) -def download_census_csvs(data_path: Path) -> None: - """Download all census shape files from the Census FTP and extract the geojson - to generate national and by state Census Block Group CSVs and GeoJSONs +class GeoFileType(Enum): + SHP = 1 + GEOJSON = 2 + CSV = 3 - Args: - data_path (pathlib.Path): Name of the directory where the files and directories will - be created - Returns: - None - """ +class CensusETL(ExtractTransformLoad): + def __init__(self): + self.SHP_BASE_PATH = self.DATA_PATH / "census" / "shp" + self.GEOJSON_BASE_PATH = self.DATA_PATH / "census" / "geojson" + self.CSV_BASE_PATH = self.DATA_PATH / "census" / "csv" + # the fips_states_2010.csv is generated from data here + # https://www.census.gov/geographies/reference-files/time-series/geo/tallies.html + self.STATE_FIPS_CODES = get_state_fips_codes(self.DATA_PATH) + self.GEOJSON_PATH = self.DATA_PATH / "census" / "geojson" + self.CBG_PER_STATE: dict = {} # in-memory dict per state + self.CBG_NATIONAL: list = [] # in-memory global list + self.NATIONAL_CBG_CSV_PATH = self.CSV_BASE_PATH / "us.csv" + self.NATIONAL_CBG_JSON_PATH = self.GEOJSON_BASE_PATH / "us.json" - # the fips_states_2010.csv is generated from data here - # https://www.census.gov/geographies/reference-files/time-series/geo/tallies.html - state_fips_codes = get_state_fips_codes(data_path) - geojson_dir_path = data_path / "census" / "geojson" + def _path_for_fips_file(self, fips_code: str, file_type: GeoFileType) -> Path: + """Get paths for associated geospatial files for the provided FIPS code + + Args: + fips_code (str): the FIPS code for the region of interest + file_type (GeoFileType): the geo file type of interest + + Returns: + Path on disk to the file_type file corresponding to this FIPS + """ + file_path : Path + if file_type == GeoFileType.SHP: + file_path = Path( + self.SHP_BASE_PATH / fips_code / f"tl_2010_{fips_code}_bg10.shp" + ) + elif file_type == GeoFileType.GEOJSON: + file_path = Path(self.GEOJSON_BASE_PATH / f"{fips_code}.json") + elif file_type == GeoFileType.CSV: + file_path = Path(self.CSV_BASE_PATH / f"{fips_code}.csv") + return file_path + + def _extract_shp(self, fips_code: str) -> None: + """Download the SHP file for the provided FIPS code + + Args: + fips_code (str): the FIPS code for the region of interest + + Returns: + None + """ + shp_file_path = self._path_for_fips_file(fips_code, GeoFileType.SHP) + logger.info(f"Checking if {fips_code} shp file exists") - for fips in state_fips_codes: # check if file exists - shp_file_path: Path = ( - data_path / "census" / "shp" / fips / f"tl_2010_{fips}_bg10.shp" - ) - geojson_file_path = data_path / "census" / "geojson" / f"{fips}.json" - - logger.info(f"Checking if {fips} shp file exists") if not shp_file_path.is_file(): logger.info( - f"{fips} shp file does not exist. Downloading and extracting shape file" + f"{fips_code} shp file does not exist. Downloading and extracting shape file" ) - # 2020 tiger data is here: https://www2.census.gov/geo/tiger/TIGER2020/BG/ - # But using 2010 for now - cbg_state_url = f"https://www2.census.gov/geo/tiger/TIGER2010/BG/2010/tl_2010_{fips}_bg10.zip" - unzip_file_from_url( - cbg_state_url, - data_path / "tmp", - data_path / "census" / "shp" / fips, - ) - logger.info(f"Checking if {fips} geoJSON file exists ") + # 2020 tiger data is here: https://www2.census.gov/geo/tiger/TIGER2020/BG/ + # But using 2010 for now + cbg_state_url = f"https://www2.census.gov/geo/tiger/TIGER2010/BG/2010/tl_2010_{fips_code}_bg10.zip" + unzip_file_from_url( + cbg_state_url, + self.TMP_PATH, + self.DATA_PATH / "census" / "shp" / fips_code, + ) + + def extract(self) -> None: + logger.info("Downloading Census Data") + for fips_code in self.STATE_FIPS_CODES: + self._extract_shp(fips_code) + + def _transform_to_geojson(self, fips_code: str) -> None: + """Convert the downloaded SHP file for the associated FIPS to geojson + + Returns: + None + """ + shp_file_path = self._path_for_fips_file(fips_code, GeoFileType.SHP) + geojson_file_path = self._path_for_fips_file(fips_code, GeoFileType.GEOJSON) + logger.info(f"Checking if {fips_code} geoJSON file exists ") if not geojson_file_path.is_file(): logger.info( - f"GeoJSON file {fips} does not exist. Converting shp to geoJSON" + f"GeoJSON file {fips_code} does not exist. Converting shp to geoJSON" ) cmd = [ "ogr2ogr", @@ -64,30 +108,50 @@ def download_census_csvs(data_path: Path) -> None: ] subprocess.run(cmd, check=True) - # generate CBG CSV table for pandas - ## load in memory - cbg_national = [] # in-memory global list - cbg_per_state: dict = {} # in-memory dict per state - for file in os.listdir(geojson_dir_path): - if file.endswith(".json"): - logger.info(f"Ingesting geoid10 for file {file}") - with open(geojson_dir_path / file) as f: - geojson = json.load(f) - for feature in geojson["features"]: - geoid10 = feature["properties"]["GEOID10"] - cbg_national.append(str(geoid10)) - geoid10_state_id = geoid10[:2] - if not cbg_per_state.get(geoid10_state_id): - cbg_per_state[geoid10_state_id] = [] - cbg_per_state[geoid10_state_id].append(geoid10) + def _generate_cbg_table(self) -> None: + """Generate CBG CSV table for pandas, load in memory - csv_dir_path = data_path / "census" / "csv" - ## write to individual state csv - for state_id in cbg_per_state: - geoid10_list = cbg_per_state[state_id] - with open( - csv_dir_path / f"{state_id}.csv", mode="w", newline="" - ) as cbg_csv_file: + Returns: + None + """ + for file in self.GEOJSON_BASE_PATH.iterdir(): + if file.suffix == ".json": + logger.info(f"Ingesting geoid10 for file {file}") + with open(self.GEOJSON_BASE_PATH / file) as f: + geojson = json.load(f) + for feature in geojson["features"]: + geoid10 = feature["properties"]["GEOID10"] + self.CBG_NATIONAL.append(str(geoid10)) + geoid10_state_id = geoid10[:2] + if not self.CBG_PER_STATE.get(geoid10_state_id): + self.CBG_PER_STATE[geoid10_state_id] = [] + self.CBG_PER_STATE[geoid10_state_id].append(geoid10) + + def transform(self) -> None: + """Download all census shape files from the Census FTP and extract the geojson + to generate national and by state Census Block Group CSVs and GeoJSONs + + Returns: + None + """ + logger.info("Transforming Census Data") + for fips_code in self.STATE_FIPS_CODES: + self._transform_to_geojson(fips_code) + self._generate_cbg_table() + + def _load_into_state_csvs(self, fips_code: str) -> None: + """Load state CSVS into individual CSV files + + Args: + fips_code (str): the FIPS code for the region of interest + + Returns: + None + """ + ## write to individual state csv + geoid10_list = self.CBG_PER_STATE[fips_code] + csv_path = self._path_for_fips_file(fips_code, GeoFileType.CSV) + with open(csv_path, mode="w", newline="") as cbg_csv_file: cbg_csv_file_writer = csv.writer( cbg_csv_file, delimiter=",", @@ -102,33 +166,59 @@ def download_census_csvs(data_path: Path) -> None: ] ) - ## write US csv - logger.info("Writing national us.csv file") - with open(csv_dir_path / "us.csv", mode="w", newline="") as cbg_csv_file: - cbg_csv_file_writer = csv.writer( - cbg_csv_file, - delimiter=",", - quotechar='"', - quoting=csv.QUOTE_MINIMAL, - ) - for geoid10 in cbg_national: - cbg_csv_file_writer.writerow( - [ - geoid10, - ] - ) + def _load_national_csv(self): + """Write national-level csv combining - ## create national geojson - logger.info("Generating national geojson file") - usa_df = gpd.GeoDataFrame() + Returns: + None + """ + logger.info("Writing national us.csv file") - for file_name in geojson_dir_path.rglob("*.json"): - logger.info(f"Ingesting {file_name}") - state_gdf = gpd.read_file(file_name) - usa_df = usa_df.append(state_gdf) + if not self.NATIONAL_CBG_CSV_PATH.is_file(): + logger.info(f"Creating {self.NATIONAL_CBG_CSV_PATH}") + with open(self.NATIONAL_CBG_CSV_PATH, mode="w", newline="") as cbg_csv_file: + cbg_csv_file_writer = csv.writer( + cbg_csv_file, + delimiter=",", + quotechar='"', + quoting=csv.QUOTE_MINIMAL, + ) + for geoid10 in self.CBG_NATIONAL: + cbg_csv_file_writer.writerow( + [ + geoid10, + ] + ) - usa_df = usa_df.to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs") - logger.info("Writing national geojson file") - usa_df.to_file(geojson_dir_path / "us.json", driver="GeoJSON") + def _load_national_geojson(self): + """Create national geojson - logger.info("Census block groups downloading complete") + Returns: + None + """ + logger.info("Generating national geojson file") + + usa_df = gpd.GeoDataFrame() + + for file_name in self.GEOJSON_BASE_PATH.rglob("*.json"): + logger.info(f"Ingesting {file_name}") + state_gdf = gpd.read_file(file_name) + usa_df = usa_df.append(state_gdf) + + usa_df = usa_df.to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs") + logger.info("Writing national geojson file") + usa_df.to_file(self.NATIONAL_CBG_JSON_PATH, driver="GeoJSON") + + logger.info("Census block groups downloading complete") + + def load(self) -> None: + """Create state CSVs, National CSV, and National GeoJSON + + Returns: + None + """ + logger.info("Saving Census CSV") + for fips_code in self.CBG_PER_STATE: + self._load_into_state_csvs(fips_code) + self._load_national_csv() + self._load_national_geojson()