diff --git a/data/data-pipeline/data_pipeline/application.py b/data/data-pipeline/data_pipeline/application.py index 3912e39d..6eab1dc9 100644 --- a/data/data-pipeline/data_pipeline/application.py +++ b/data/data-pipeline/data_pipeline/application.py @@ -139,9 +139,14 @@ def pull_census_data(data_source: str): @cli.command( help="Run all ETL processes or a specific one", ) +@click.option( + "--no-concurrency", + is_flag=True, + help="Run ETLs sequentially instead of concurrently.", +) @dataset_option @use_cache_option -def etl_run(dataset: str, use_cache: bool): +def etl_run(dataset: str, use_cache: bool, no_concurrency: bool): """Run a specific or all ETL processes Args: @@ -153,7 +158,7 @@ def etl_run(dataset: str, use_cache: bool): log_title("Run ETL") log_info("Running dataset(s)") - etl_runner(dataset, use_cache) + etl_runner(dataset, use_cache, no_concurrency) log_goodbye() diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index 5014771a..9ba5e1a9 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -1,6 +1,7 @@ import concurrent.futures import importlib import typing +import os from functools import reduce @@ -84,7 +85,11 @@ def _run_one_dataset(dataset: dict, use_cache: bool = False) -> None: logger.info(f"Finished ETL for dataset {dataset['name']}") -def etl_runner(dataset_to_run: str = None, use_cache: bool = False) -> None: +def etl_runner( + dataset_to_run: str = None, + use_cache: bool = False, + no_concurrency: bool = False, +) -> None: """Runs all etl processes or a specific one Args: @@ -112,9 +117,12 @@ def etl_runner(dataset_to_run: str = None, use_cache: bool = False) -> None: dataset for dataset in dataset_list if dataset["is_memory_intensive"] ] + max_workers = 1 if no_concurrency else os.cpu_count() if concurrent_datasets: - logger.info("Running concurrent ETL jobs") - with concurrent.futures.ThreadPoolExecutor() as executor: + logger.info(f"Running concurrent ETL jobs on {max_workers} thread(s)") + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor: futures = { executor.submit( _run_one_dataset, dataset=dataset, use_cache=use_cache