From 5ceb7068a6adf191a986cf875ea0412e4507dd53 Mon Sep 17 00:00:00 2001 From: Saran Ahluwalia Date: Mon, 13 Dec 2021 20:31:55 -0500 Subject: [PATCH] refactor with more comments --- .../data-pipeline/data_pipeline/etl/runner.py | 47 +++++++++++++++---- data/data-pipeline/pyproject.toml | 1 + 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index c46f098c..80850235 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -33,7 +33,7 @@ def get_datasets_to_run(dataset_to_run: str): return dataset_list -def build_instance_class_from_dataset(dataset: str = None) -> None: +def run_etl_single_dataset(dataset: str = None) -> str: """Runs a specific dataset Args: @@ -66,6 +66,10 @@ def build_instance_class_from_dataset(dataset: str = None) -> None: # cleanup etl_instance.cleanup() + message = f"Finished ETL for {dataset}" + + return message + def etl_runner(dataset_to_run: str = None) -> None: """Runs all etl processes or a specific one @@ -77,18 +81,43 @@ def etl_runner(dataset_to_run: str = None) -> None: None """ dataset_list = get_datasets_to_run(dataset_to_run) - NUMBER_OF_THREADS = 8 + NUMBER_OF_THREADS = 10 executor = concurrent.futures.ThreadPoolExecutor( max_workers=NUMBER_OF_THREADS ) - futures = [] - for dataset in dataset_list: - futures.append( - executor.submit(build_instance_class_from_dataset, dataset) - ) + futures_list = [] + results = [] - for future in concurrent.futures.as_completed(futures): - pass + with concurrent.futures.ThreadPoolExecutor( + max_workers=NUMBER_OF_THREADS + ) as executor: + for dataset in dataset_list: + futures = executor.submit( + # manually create Future object + # previously using map, we were + # not managing exceptions so thoughtfully + run_etl_single_dataset, + dataset, + ) + futures_list.append(futures) + + for future in futures_list: + try: + # emprically tested timeout + result = future.result(timeout=300) + results.append(result) + # this catches any exception for that given dataset + # one could customize this to specify which dataset + # but we perform so much logging that this may be + # corroborating evidence + except Exception: + results.append(None) + + # sanity check to ensure all of our datasets + # returned successfully (even though not in any + # particular order) + for result in results: + print(f"Result from future: {result}") pass diff --git a/data/data-pipeline/pyproject.toml b/data/data-pipeline/pyproject.toml index 18a45ede..462750ed 100644 --- a/data/data-pipeline/pyproject.toml +++ b/data/data-pipeline/pyproject.toml @@ -67,6 +67,7 @@ disable = [ "C0115", # Disables missing class docstring "R0915", # Disables too many statements (score generation transform) "W0231", # Disables super init not called + "W0703", # Disables catching too general exception Exception (broad-except) "R0801", # Disables duplicate code. There are a couple places we have similar code and # unfortunately you can't disable this rule for individual lines or files, it's a # known bug. https://github.com/PyCQA/pylint/issues/214#