mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-23 18:14:19 -08:00
create threadpool to execture ETL across all CPUs
This commit is contained in:
parent
2cc1fa6079
commit
8ad1f2408f
1 changed files with 51 additions and 18 deletions
|
@ -1,4 +1,5 @@
|
||||||
import importlib
|
import importlib
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
from data_pipeline.etl.score.etl_score import ScoreETL
|
from data_pipeline.etl.score.etl_score import ScoreETL
|
||||||
from data_pipeline.etl.score.etl_score_geo import GeoScoreETL
|
from data_pipeline.etl.score.etl_score_geo import GeoScoreETL
|
||||||
|
@ -32,6 +33,40 @@ def get_datasets_to_run(dataset_to_run: str):
|
||||||
return dataset_list
|
return dataset_list
|
||||||
|
|
||||||
|
|
||||||
|
def build_instance_class_from_dataset(dataset: str = None) -> None:
|
||||||
|
"""Runs a specific dataset
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dataset (str): A specific dataset eligible for a specific ETL process.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
etl_module = importlib.import_module(
|
||||||
|
f"data_pipeline.etl.sources.{dataset['module_dir']}.etl"
|
||||||
|
)
|
||||||
|
etl_class = getattr(etl_module, dataset["class_name"])
|
||||||
|
etl_instance = etl_class()
|
||||||
|
|
||||||
|
etl_module = importlib.import_module(
|
||||||
|
f"data_pipeline.etl.sources.{dataset['module_dir']}.etl"
|
||||||
|
)
|
||||||
|
etl_class = getattr(etl_module, dataset["class_name"])
|
||||||
|
etl_instance = etl_class()
|
||||||
|
|
||||||
|
# run extract
|
||||||
|
etl_instance.extract()
|
||||||
|
|
||||||
|
# run transform
|
||||||
|
etl_instance.transform()
|
||||||
|
|
||||||
|
# run load
|
||||||
|
etl_instance.load()
|
||||||
|
|
||||||
|
# cleanup
|
||||||
|
etl_instance.cleanup()
|
||||||
|
|
||||||
|
|
||||||
def etl_runner(dataset_to_run: str = None) -> None:
|
def etl_runner(dataset_to_run: str = None) -> None:
|
||||||
"""Runs all etl processes or a specific one
|
"""Runs all etl processes or a specific one
|
||||||
|
|
||||||
|
@ -43,25 +78,23 @@ def etl_runner(dataset_to_run: str = None) -> None:
|
||||||
"""
|
"""
|
||||||
dataset_list = get_datasets_to_run(dataset_to_run)
|
dataset_list = get_datasets_to_run(dataset_to_run)
|
||||||
|
|
||||||
# Run the ETLs for the dataset_list
|
# Create a pool of processes. By default, one is created for each CPU in your machine.
|
||||||
for dataset in dataset_list:
|
with concurrent.futures.ProcessPoolExecutor() as executor:
|
||||||
etl_module = importlib.import_module(
|
# Get a list of files to process
|
||||||
f"data_pipeline.etl.sources.{dataset['module_dir']}.etl"
|
build_processing_classes = [
|
||||||
)
|
build_instance_class_from_dataset(dataset)
|
||||||
etl_class = getattr(etl_module, dataset["class_name"])
|
for dataset in dataset_list
|
||||||
etl_instance = etl_class()
|
]
|
||||||
|
|
||||||
# run extract
|
# Process the list of ETL datasets, but split the work across the
|
||||||
etl_instance.extract()
|
# process pool to use all CPUs
|
||||||
|
for dataset, _ in zip(
|
||||||
# run transform
|
build_processing_classes,
|
||||||
etl_instance.transform()
|
executor.map(
|
||||||
|
build_instance_class_from_dataset, build_processing_classes
|
||||||
# run load
|
),
|
||||||
etl_instance.load()
|
):
|
||||||
|
print(f"Completed ETL for {dataset}")
|
||||||
# cleanup
|
|
||||||
etl_instance.cleanup()
|
|
||||||
|
|
||||||
# update the front end JSON/CSV of list of data sources
|
# update the front end JSON/CSV of list of data sources
|
||||||
pass
|
pass
|
||||||
|
|
Loading…
Add table
Reference in a new issue