Improve stability by limiting multithreading when running ETL

This commit is contained in:
Carlos Felix 2025-01-08 10:25:02 -05:00 committed by Carlos Felix
parent d4898b8f55
commit 6f3432d48a
2 changed files with 18 additions and 5 deletions

View file

@ -139,9 +139,14 @@ def pull_census_data(data_source: str):
@cli.command(
help="Run all ETL processes or a specific one",
)
@click.option(
"--no-concurrency",
is_flag=True,
help="Run ETLs sequentially instead of concurrently.",
)
@dataset_option
@use_cache_option
def etl_run(dataset: str, use_cache: bool):
def etl_run(dataset: str, use_cache: bool, no_concurrency: bool):
"""Run a specific or all ETL processes
Args:
@ -153,7 +158,7 @@ def etl_run(dataset: str, use_cache: bool):
log_title("Run ETL")
log_info("Running dataset(s)")
etl_runner(dataset, use_cache)
etl_runner(dataset, use_cache, no_concurrency)
log_goodbye()

View file

@ -1,6 +1,7 @@
import concurrent.futures
import importlib
import typing
import os
from functools import reduce
@ -84,7 +85,11 @@ def _run_one_dataset(dataset: dict, use_cache: bool = False) -> None:
logger.info(f"Finished ETL for dataset {dataset['name']}")
def etl_runner(dataset_to_run: str = None, use_cache: bool = False) -> None:
def etl_runner(
dataset_to_run: str = None,
use_cache: bool = False,
no_concurrency: bool = False,
) -> None:
"""Runs all etl processes or a specific one
Args:
@ -112,9 +117,12 @@ def etl_runner(dataset_to_run: str = None, use_cache: bool = False) -> None:
dataset for dataset in dataset_list if dataset["is_memory_intensive"]
]
max_workers = 1 if no_concurrency else os.cpu_count()
if concurrent_datasets:
logger.info("Running concurrent ETL jobs")
with concurrent.futures.ThreadPoolExecutor() as executor:
logger.info(f"Running concurrent ETL jobs on {max_workers} thread(s)")
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = {
executor.submit(
_run_one_dataset, dataset=dataset, use_cache=use_cache