mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-23 18:14:19 -08:00
refactor with more comments
This commit is contained in:
parent
306fa204fb
commit
5ceb7068a6
2 changed files with 39 additions and 9 deletions
|
@ -33,7 +33,7 @@ def get_datasets_to_run(dataset_to_run: str):
|
|||
return dataset_list
|
||||
|
||||
|
||||
def build_instance_class_from_dataset(dataset: str = None) -> None:
|
||||
def run_etl_single_dataset(dataset: str = None) -> str:
|
||||
"""Runs a specific dataset
|
||||
|
||||
Args:
|
||||
|
@ -66,6 +66,10 @@ def build_instance_class_from_dataset(dataset: str = None) -> None:
|
|||
# cleanup
|
||||
etl_instance.cleanup()
|
||||
|
||||
message = f"Finished ETL for {dataset}"
|
||||
|
||||
return message
|
||||
|
||||
|
||||
def etl_runner(dataset_to_run: str = None) -> None:
|
||||
"""Runs all etl processes or a specific one
|
||||
|
@ -77,18 +81,43 @@ def etl_runner(dataset_to_run: str = None) -> None:
|
|||
None
|
||||
"""
|
||||
dataset_list = get_datasets_to_run(dataset_to_run)
|
||||
NUMBER_OF_THREADS = 8
|
||||
NUMBER_OF_THREADS = 10
|
||||
executor = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=NUMBER_OF_THREADS
|
||||
)
|
||||
futures = []
|
||||
for dataset in dataset_list:
|
||||
futures.append(
|
||||
executor.submit(build_instance_class_from_dataset, dataset)
|
||||
)
|
||||
futures_list = []
|
||||
results = []
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
pass
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=NUMBER_OF_THREADS
|
||||
) as executor:
|
||||
for dataset in dataset_list:
|
||||
futures = executor.submit(
|
||||
# manually create Future object
|
||||
# previously using map, we were
|
||||
# not managing exceptions so thoughtfully
|
||||
run_etl_single_dataset,
|
||||
dataset,
|
||||
)
|
||||
futures_list.append(futures)
|
||||
|
||||
for future in futures_list:
|
||||
try:
|
||||
# emprically tested timeout
|
||||
result = future.result(timeout=300)
|
||||
results.append(result)
|
||||
# this catches any exception for that given dataset
|
||||
# one could customize this to specify which dataset
|
||||
# but we perform so much logging that this may be
|
||||
# corroborating evidence
|
||||
except Exception:
|
||||
results.append(None)
|
||||
|
||||
# sanity check to ensure all of our datasets
|
||||
# returned successfully (even though not in any
|
||||
# particular order)
|
||||
for result in results:
|
||||
print(f"Result from future: {result}")
|
||||
|
||||
pass
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ disable = [
|
|||
"C0115", # Disables missing class docstring
|
||||
"R0915", # Disables too many statements (score generation transform)
|
||||
"W0231", # Disables super init not called
|
||||
"W0703", # Disables catching too general exception Exception (broad-except)
|
||||
"R0801", # Disables duplicate code. There are a couple places we have similar code and
|
||||
# unfortunately you can't disable this rule for individual lines or files, it's a
|
||||
# known bug. https://github.com/PyCQA/pylint/issues/214#
|
||||
|
|
Loading…
Add table
Reference in a new issue