diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index 59179b0c..62d8d828 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -1,4 +1,5 @@ import importlib +import concurrent.futures from data_pipeline.etl.score.etl_score import ScoreETL from data_pipeline.etl.score.etl_score_geo import GeoScoreETL @@ -32,6 +33,40 @@ def get_datasets_to_run(dataset_to_run: str): return dataset_list +def build_instance_class_from_dataset(dataset: str = None) -> None: + """Runs a specific dataset + + Args: + dataset (str): A specific dataset eligible for a specific ETL process. + + Returns: + None + """ + etl_module = importlib.import_module( + f"data_pipeline.etl.sources.{dataset['module_dir']}.etl" + ) + etl_class = getattr(etl_module, dataset["class_name"]) + etl_instance = etl_class() + + etl_module = importlib.import_module( + f"data_pipeline.etl.sources.{dataset['module_dir']}.etl" + ) + etl_class = getattr(etl_module, dataset["class_name"]) + etl_instance = etl_class() + + # run extract + etl_instance.extract() + + # run transform + etl_instance.transform() + + # run load + etl_instance.load() + + # cleanup + etl_instance.cleanup() + + def etl_runner(dataset_to_run: str = None) -> None: """Runs all etl processes or a specific one @@ -43,25 +78,23 @@ def etl_runner(dataset_to_run: str = None) -> None: """ dataset_list = get_datasets_to_run(dataset_to_run) - # Run the ETLs for the dataset_list - for dataset in dataset_list: - etl_module = importlib.import_module( - f"data_pipeline.etl.sources.{dataset['module_dir']}.etl" - ) - etl_class = getattr(etl_module, dataset["class_name"]) - etl_instance = etl_class() + # Create a pool of processes. By default, one is created for each CPU in your machine. + with concurrent.futures.ProcessPoolExecutor() as executor: + # Get a list of files to process + build_processing_classes = [ + build_instance_class_from_dataset(dataset) + for dataset in dataset_list + ] - # run extract - etl_instance.extract() - - # run transform - etl_instance.transform() - - # run load - etl_instance.load() - - # cleanup - etl_instance.cleanup() + # Process the list of ETL datasets, but split the work across the + # process pool to use all CPUs + for dataset, _ in zip( + build_processing_classes, + executor.map( + build_instance_class_from_dataset, build_processing_classes + ), + ): + print(f"Completed ETL for {dataset}") # update the front end JSON/CSV of list of data sources pass