mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-24 02:24:20 -08:00
refactor into using thread pool executor
This commit is contained in:
parent
a1e15b63c2
commit
306fa204fb
1 changed files with 11 additions and 32 deletions
|
@ -1,7 +1,5 @@
|
||||||
import importlib
|
import importlib
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
import time
|
|
||||||
from functools import wraps
|
|
||||||
|
|
||||||
from data_pipeline.etl.score.etl_score import ScoreETL
|
from data_pipeline.etl.score.etl_score import ScoreETL
|
||||||
from data_pipeline.etl.score.etl_score_geo import GeoScoreETL
|
from data_pipeline.etl.score.etl_score_geo import GeoScoreETL
|
||||||
|
@ -69,24 +67,6 @@ def build_instance_class_from_dataset(dataset: str = None) -> None:
|
||||||
etl_instance.cleanup()
|
etl_instance.cleanup()
|
||||||
|
|
||||||
|
|
||||||
import time
|
|
||||||
from functools import wraps
|
|
||||||
|
|
||||||
|
|
||||||
def timeit(method):
|
|
||||||
@wraps(method)
|
|
||||||
def wrapper(*args, **kwargs):
|
|
||||||
start_time = time.time()
|
|
||||||
result = method(*args, **kwargs)
|
|
||||||
end_time = time.time()
|
|
||||||
print(f"{method.__name__} => {(end_time - start_time)*1000} ms")
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
|
|
||||||
@timeit
|
|
||||||
def etl_runner(dataset_to_run: str = None) -> None:
|
def etl_runner(dataset_to_run: str = None) -> None:
|
||||||
"""Runs all etl processes or a specific one
|
"""Runs all etl processes or a specific one
|
||||||
|
|
||||||
|
@ -97,20 +77,19 @@ def etl_runner(dataset_to_run: str = None) -> None:
|
||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
dataset_list = get_datasets_to_run(dataset_to_run)
|
dataset_list = get_datasets_to_run(dataset_to_run)
|
||||||
|
NUMBER_OF_THREADS = 8
|
||||||
|
executor = concurrent.futures.ThreadPoolExecutor(
|
||||||
|
max_workers=NUMBER_OF_THREADS
|
||||||
|
)
|
||||||
|
futures = []
|
||||||
|
for dataset in dataset_list:
|
||||||
|
futures.append(
|
||||||
|
executor.submit(build_instance_class_from_dataset, dataset)
|
||||||
|
)
|
||||||
|
|
||||||
# Create a pool of processes. By default, one is created for each CPU in your machine.
|
for future in concurrent.futures.as_completed(futures):
|
||||||
with concurrent.futures.ProcessPoolExecutor() as executor:
|
pass
|
||||||
# Process the list of ETL datasets, but split the work across the
|
|
||||||
# process pool to use all CPUs
|
|
||||||
for dataset, _ in zip(
|
|
||||||
dataset_list,
|
|
||||||
executor.map(
|
|
||||||
build_instance_class_from_dataset, dataset_list
|
|
||||||
),
|
|
||||||
):
|
|
||||||
print(f"Completed ETL for {dataset}")
|
|
||||||
|
|
||||||
# update the front end JSON/CSV of list of data sources
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue