mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-24 10:34:18 -08:00
* working notebook * updating notebook * wip * fixing broken tests * adding tribal overlap files * WIP * WIP * WIP, calculated count and names * working * partial cleanup * partial cleanup * updating field names * fixing bug * removing pyogrio * removing unused imports * updating test fixtures to be more realistic * cleaning up notebook * fixing black * fixing flake8 errors * adding tox instructions * updating etl_score * suppressing warning * Use projected CRSes, ignore geom types (#1900) I looked into this a bit, and in general the geometry type mismatch changes very little about the calculation; we have a mix of multipolygons and polygons. The fastest thing to do is just not keep geom type; I did some runs with it set to both True and False, and they're the same within 9 digits of precision. Logically we just want to overlaps, regardless of how the actual geometries are encoded between the frames, so we can in this case ignore the geom types and feel OKAY. I also moved to projected CRSes, since we are actually trying to do area calculations and so like, we should. Again, the change is small in magnitude but logically more sound. * Readd CDC dataset config (#1900) * adding comments to fips code * delete unnecessary loggers Co-authored-by: matt bowen <matthew.r.bowen@omb.eop.gov>
178 lines
5.1 KiB
Python
178 lines
5.1 KiB
Python
import importlib
|
|
import concurrent.futures
|
|
import typing
|
|
|
|
from data_pipeline.etl.score.etl_score import ScoreETL
|
|
from data_pipeline.etl.score.etl_score_geo import GeoScoreETL
|
|
from data_pipeline.etl.score.etl_score_post import PostScoreETL
|
|
from data_pipeline.utils import get_module_logger
|
|
|
|
from . import constants
|
|
|
|
logger = get_module_logger(__name__)
|
|
|
|
|
|
def _get_datasets_to_run(dataset_to_run: str) -> typing.List[dict]:
|
|
"""Returns a list of appropriate datasets to run given input args
|
|
|
|
Args:
|
|
dataset_to_run (str): Run a specific ETL process. If missing, runs all processes (optional)
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
dataset_list = constants.DATASET_LIST
|
|
etls_to_search = (
|
|
dataset_list + [constants.CENSUS_INFO] + [constants.TRIBAL_INFO]
|
|
)
|
|
|
|
if dataset_to_run:
|
|
dataset_element = next(
|
|
(item for item in etls_to_search if item["name"] == dataset_to_run),
|
|
None,
|
|
)
|
|
if not dataset_element:
|
|
raise ValueError("Invalid dataset name")
|
|
else:
|
|
# reset the list to just the dataset
|
|
dataset_list = [dataset_element]
|
|
|
|
return dataset_list
|
|
|
|
|
|
def _run_one_dataset(dataset: dict) -> None:
|
|
"""Runs one etl process."""
|
|
etl_module = importlib.import_module(
|
|
f"data_pipeline.etl.sources.{dataset['module_dir']}.etl"
|
|
)
|
|
etl_class = getattr(etl_module, dataset["class_name"])
|
|
etl_instance = etl_class()
|
|
|
|
# run extract
|
|
etl_instance.extract()
|
|
|
|
# run transform
|
|
etl_instance.transform()
|
|
|
|
# run load
|
|
etl_instance.load()
|
|
|
|
# run validate
|
|
etl_instance.validate()
|
|
|
|
# cleanup
|
|
etl_instance.cleanup()
|
|
|
|
logger.info(f"Finished `etl-run` for dataset `{dataset['name']}`.")
|
|
|
|
|
|
def etl_runner(dataset_to_run: str = None) -> None:
|
|
"""Runs all etl processes or a specific one
|
|
|
|
Args:
|
|
dataset_to_run (str): Run a specific ETL process. If missing, runs all processes (optional)
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
dataset_list = _get_datasets_to_run(dataset_to_run)
|
|
|
|
# Because we are memory constrained on our infrastructure,
|
|
# we split datasets into those that are not memory intensive
|
|
# (is_memory_intensive == False) and thereby can be safely
|
|
# run in parallel, and those that require more RAM and thus
|
|
# should be run sequentially. The is_memory_intensive_flag is
|
|
# set manually in constants.py based on experience running
|
|
# the pipeline
|
|
concurrent_datasets = [
|
|
dataset
|
|
for dataset in dataset_list
|
|
if not dataset["is_memory_intensive"]
|
|
]
|
|
high_memory_datasets = [
|
|
dataset for dataset in dataset_list if dataset["is_memory_intensive"]
|
|
]
|
|
|
|
if concurrent_datasets:
|
|
logger.info("Running concurrent jobs")
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
futures = {
|
|
executor.submit(_run_one_dataset, dataset=dataset)
|
|
for dataset in concurrent_datasets
|
|
}
|
|
|
|
for fut in concurrent.futures.as_completed(futures):
|
|
# Calling result will raise an exception if one occurred.
|
|
# Otherwise, the exceptions are silently ignored.
|
|
fut.result()
|
|
|
|
# Note: these high-memory datasets also usually require the Census geojson to be
|
|
# generated, and one of them requires the Tribal geojson to be generated.
|
|
if high_memory_datasets:
|
|
logger.info("Running high-memory jobs")
|
|
for dataset in high_memory_datasets:
|
|
_run_one_dataset(dataset=dataset)
|
|
|
|
|
|
def score_generate() -> None:
|
|
"""Generates the score and saves it on the local data directory
|
|
|
|
Args:
|
|
None
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
# Score Gen
|
|
score_gen = ScoreETL()
|
|
score_gen.extract()
|
|
score_gen.transform()
|
|
score_gen.load()
|
|
|
|
|
|
def score_post(data_source: str = "local") -> None:
|
|
"""Posts the score files to the local directory
|
|
|
|
Args:
|
|
data_source (str): Source for the census data (optional)
|
|
Options:
|
|
- local (default): fetch census data from the local data directory
|
|
- aws: fetch census from AWS S3 J40 data repository
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
# Post Score Processing
|
|
score_post = PostScoreETL(data_source=data_source)
|
|
score_post.extract()
|
|
score_post.transform()
|
|
score_post.load()
|
|
score_post.cleanup()
|
|
|
|
|
|
def score_geo(data_source: str = "local") -> None:
|
|
"""Generates the geojson files with score data baked in
|
|
|
|
Args:
|
|
data_source (str): Source for the census data (optional)
|
|
Options:
|
|
- local (default): fetch census data from the local data directory
|
|
- aws: fetch census from AWS S3 J40 data repository
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
# Score Geo
|
|
score_geo = GeoScoreETL(data_source=data_source)
|
|
score_geo.extract()
|
|
score_geo.transform()
|
|
score_geo.load()
|
|
|
|
|
|
def _find_dataset_index(dataset_list, key, value):
|
|
for i, element in enumerate(dataset_list):
|
|
if element[key] == value:
|
|
return i
|
|
return -1
|