j40-cejst-2/data/data-pipeline/data_pipeline/application.py
Matt Bowen d5fbb802e8
Add FUDS ETL (#1817)
* Add spatial join method (#1871)

Since we'll need to figure out the tracts for a large number of points
in future tickets, add a utility to handle grabbing the tract geometries
and adding tract data to a point dataset.

* Add FUDS, also jupyter lab (#1871)

* Add YAML configs for FUDS (#1871)

* Allow input geoid to be optional (#1871)

* Add FUDS ETL, tests, test-datae noteobook (#1871)

This adds the ETL class for Formerly Used Defense Sites (FUDS). This is
different from most other ETLs since these FUDS are not provided by
tract, but instead by geographic point, so we need to assign FUDS to
tracts and then do calculations from there.

* Floats -> Ints, as I intended (#1871)

* Floats -> Ints, as I intended (#1871)

* Formatting fixes (#1871)

* Add test false positive GEOIDs (#1871)

* Add gdal binaries (#1871)

* Refactor pandas code to be more idiomatic (#1871)

Per Emma, the more pandas-y way of doing my counts is using np.where to
add the values i need, then groupby and size. It is definitely more
compact, and also I think more correct!

* Update configs per Emma suggestions (#1871)

* Type fixed! (#1871)

* Remove spurious import from vscode (#1871)

* Snapshot update after changing col name (#1871)

* Move up GDAL (#1871)

* Adjust geojson strategy (#1871)

* Try running census separately first (#1871)

* Fix import order (#1871)

* Cleanup cache strategy (#1871)

* Download census data from S3 instead of re-calculating (#1871)

* Clarify pandas code per Emma (#1871)
2022-08-16 13:28:39 -04:00

315 lines
7.5 KiB
Python

from subprocess import call
import sys
import click
from data_pipeline.config import settings
from data_pipeline.etl.runner import (
etl_runner,
score_generate,
score_geo,
score_post,
)
from data_pipeline.etl.sources.census.etl_utils import (
check_census_data_source,
reset_data_directories as census_reset,
zip_census_data,
)
from data_pipeline.etl.sources.tribal.etl_utils import (
reset_data_directories as tribal_reset,
)
from data_pipeline.tile.generate import generate_tiles
from data_pipeline.utils import (
data_folder_cleanup,
get_module_logger,
score_folder_cleanup,
downloadable_cleanup,
temp_folder_cleanup,
check_first_run,
)
logger = get_module_logger(__name__)
dataset_cli_help = "Grab the data from either 'local' for local access or 'aws' to retrieve from Justice40 S3 repository"
@click.group()
def cli():
"""Defines a click group for the commands below"""
pass
@cli.command(help="Clean up all census data folders")
def census_cleanup():
"""CLI command to clean up the census data folder"""
data_path = settings.APP_ROOT / "data"
# census directories
logger.info("Initializing all census data")
census_reset(data_path)
logger.info("Cleaned up all census data files")
sys.exit()
@cli.command(help="Clean up all data folders")
def data_cleanup():
"""CLI command to clean up the all the data folders"""
data_path = settings.APP_ROOT / "data"
census_reset(data_path)
data_folder_cleanup()
tribal_reset(data_path)
score_folder_cleanup()
temp_folder_cleanup()
logger.info("Cleaned up all data folders")
sys.exit()
@cli.command(
help="Census data download",
)
@click.option(
"-zc",
"--zip-compress",
is_flag=True,
help="Upload to AWS S3 a zipped archive of the census data.",
)
def census_data_download(zip_compress):
"""CLI command to download all census shape files from the Census FTP and extract the geojson
to generate national and by state Census Block Group CSVs"""
logger.info("Initializing all census data")
data_path = settings.APP_ROOT / "data"
census_reset(data_path)
logger.info("Downloading census data")
etl_runner("census")
if zip_compress:
zip_census_data()
logger.info("Completed downloading census data")
sys.exit()
@cli.command(help="Retrieve census data from source")
@click.option(
"-s",
"--data-source",
default="local",
required=False,
type=str,
help=dataset_cli_help,
)
def pull_census_data(data_source: str):
logger.info("Pulling census data from %s", data_source)
data_path = settings.APP_ROOT / "data" / "census"
check_census_data_source(data_path, data_source)
logger.info("Finished pulling census data")
sys.exit()
@cli.command(
help="Run all ETL processes or a specific one",
)
@click.option(
"-d",
"--dataset",
required=False,
type=str,
help=dataset_cli_help,
)
def etl_run(dataset: str):
"""Run a specific or all ETL processes
Args:
dataset (str): Name of the ETL module to be run (optional)
Returns:
None
"""
etl_runner(dataset)
sys.exit()
@cli.command(
help="Generate Score",
)
def score_run():
"""CLI command to generate the score"""
score_folder_cleanup()
score_generate()
sys.exit()
@cli.command(
help="Run ETL + Score Generation",
)
def score_full_run():
"""CLI command to run ETL and generate the score in one command"""
data_folder_cleanup()
score_folder_cleanup()
temp_folder_cleanup()
etl_runner()
score_generate()
sys.exit()
@cli.command(help="Generate Geojson files with scores baked in")
@click.option(
"-s",
"--data-source",
default="local",
required=False,
type=str,
help=dataset_cli_help,
)
def geo_score(data_source: str):
"""CLI command to combine score with GeoJSON data and generate low and high files
Args:
data_source (str): Source for the census data (optional)
Options:
- local: fetch census and score data from the local data directory
- aws: fetch census and score from AWS S3 J40 data repository
Returns:
None
"""
score_geo(data_source=data_source)
sys.exit()
@cli.command(
help="Generate map tiles. Pass -t to generate tribal layer as well.",
)
@click.option(
"-t",
"--generate-tribal-layer",
default=False,
required=False,
is_flag=True,
type=bool,
)
def generate_map_tiles(generate_tribal_layer):
"""CLI command to generate the map tiles"""
data_path = settings.APP_ROOT / "data"
generate_tiles(data_path, generate_tribal_layer)
sys.exit()
@cli.command(
help="Run etl_score_post to create score csv, tile csv, and downloadable zip",
)
@click.option(
"-s",
"--data-source",
default="local",
required=False,
type=str,
help=dataset_cli_help,
)
def generate_score_post(data_source: str):
"""CLI command to generate score, tile, and downloadable files
Args:
data_source (str): Source for the census data (optional)
Options:
- local: fetch census and score data from the local data directory
- aws: fetch census and score from AWS S3 J40 data repository
Returns:
None
"""
downloadable_cleanup()
score_post(data_source)
sys.exit()
@cli.command(
help="Data Full Run (Census download, ETLs, score, combine and tile generation)",
)
@click.option(
"-c",
"--check",
is_flag=True,
help="Check if data run has been run before, and don't run it if so.",
)
@click.option(
"-s",
"--data-source",
default="local",
required=False,
type=str,
help=dataset_cli_help,
)
def data_full_run(check: bool, data_source: str):
"""CLI command to run ETL, score, JSON combine and generate tiles in one command
Args:
check (bool): Run the full data run only if the first run sempahore file is not set (optional)
data_source (str): Source for the census data (optional)
Options:
- local: fetch census and score data from the local data directory
- aws: fetch census and score from AWS S3 J40 data repository
Returns:
None
"""
data_path = settings.APP_ROOT / "data"
if check:
if not check_first_run():
# check if the data full run has been run before
logger.info("*** The data full run was already executed")
sys.exit()
else:
# census directories
logger.info("*** Initializing all data folders")
census_reset(data_path)
data_folder_cleanup()
score_folder_cleanup()
temp_folder_cleanup()
if data_source == "local":
logger.info("*** Downloading census data")
etl_runner("census")
logger.info("*** Running all ETLs")
etl_runner()
logger.info("*** Generating Score")
score_generate()
logger.info("*** Running Post Score scripts")
downloadable_cleanup()
score_post(data_source)
logger.info("*** Combining Score with Census Geojson")
score_geo(data_source)
logger.info("*** Generating Map Tiles")
generate_tiles(data_path, True)
file = "first_run.txt"
cmd = f"touch {data_path}/{file}"
call(cmd, shell=True)
logger.info("*** Map data ready")
sys.exit()
if __name__ == "__main__":
cli()