diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index 8e175181..c46f098c 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -1,7 +1,5 @@ import importlib import concurrent.futures -import time -from functools import wraps from data_pipeline.etl.score.etl_score import ScoreETL from data_pipeline.etl.score.etl_score_geo import GeoScoreETL @@ -69,24 +67,6 @@ def build_instance_class_from_dataset(dataset: str = None) -> None: etl_instance.cleanup() -import time -from functools import wraps - - -def timeit(method): - @wraps(method) - def wrapper(*args, **kwargs): - start_time = time.time() - result = method(*args, **kwargs) - end_time = time.time() - print(f"{method.__name__} => {(end_time - start_time)*1000} ms") - - return result - - return wrapper - - -@timeit def etl_runner(dataset_to_run: str = None) -> None: """Runs all etl processes or a specific one @@ -97,20 +77,19 @@ def etl_runner(dataset_to_run: str = None) -> None: None """ dataset_list = get_datasets_to_run(dataset_to_run) + NUMBER_OF_THREADS = 8 + executor = concurrent.futures.ThreadPoolExecutor( + max_workers=NUMBER_OF_THREADS + ) + futures = [] + for dataset in dataset_list: + futures.append( + executor.submit(build_instance_class_from_dataset, dataset) + ) - # Create a pool of processes. By default, one is created for each CPU in your machine. - with concurrent.futures.ProcessPoolExecutor() as executor: - # Process the list of ETL datasets, but split the work across the - # process pool to use all CPUs - for dataset, _ in zip( - dataset_list, - executor.map( - build_instance_class_from_dataset, dataset_list - ), - ): - print(f"Completed ETL for {dataset}") + for future in concurrent.futures.as_completed(futures): + pass - # update the front end JSON/CSV of list of data sources pass