mirror of
				https://github.com/DOI-DO/j40-cejst-2.git
				synced 2025-10-22 04:33:51 -07:00 
			
		
		
		
	
		
			
				
	
	
		
			250 lines
		
	
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			250 lines
		
	
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import concurrent.futures
 | ||
| import importlib
 | ||
| import typing
 | ||
| import os
 | ||
| 
 | ||
| from functools import reduce
 | ||
| 
 | ||
| from data_pipeline.etl.score.etl_score import ScoreETL
 | ||
| from data_pipeline.etl.score.etl_score_geo import GeoScoreETL
 | ||
| from data_pipeline.etl.score.etl_score_post import PostScoreETL
 | ||
| from data_pipeline.utils import get_module_logger
 | ||
| from data_pipeline.etl.base import ExtractTransformLoad
 | ||
| from data_pipeline.etl.datasource import DataSource
 | ||
| 
 | ||
| from . import constants
 | ||
| 
 | ||
| logger = get_module_logger(__name__)
 | ||
| 
 | ||
| 
 | ||
| def _get_datasets_to_run(dataset_to_run: str) -> typing.List[dict]:
 | ||
|     """Returns a list of appropriate datasets to run given input args
 | ||
| 
 | ||
|     Args:
 | ||
|         dataset_to_run (str): Run a specific ETL process. If missing, runs all processes (optional)
 | ||
| 
 | ||
|     Returns:
 | ||
|         None
 | ||
|     """
 | ||
|     dataset_list = constants.DATASET_LIST
 | ||
|     etls_to_search = (
 | ||
|         dataset_list + [constants.CENSUS_INFO] + [constants.TRIBAL_INFO]
 | ||
|     )
 | ||
| 
 | ||
|     if dataset_to_run:
 | ||
|         dataset_element = next(
 | ||
|             (item for item in etls_to_search if item["name"] == dataset_to_run),
 | ||
|             None,
 | ||
|         )
 | ||
|         if not dataset_element:
 | ||
|             raise ValueError("Invalid dataset name")
 | ||
|         else:
 | ||
|             # reset the list to just the dataset
 | ||
|             dataset_list = [dataset_element]
 | ||
| 
 | ||
|     return dataset_list
 | ||
| 
 | ||
| 
 | ||
| def _get_dataset(dataset: dict) -> ExtractTransformLoad:
 | ||
|     """Instantiates a dataset object from a dictionary description of that object's class"""
 | ||
|     etl_module = importlib.import_module(
 | ||
|         f"data_pipeline.etl.sources.{dataset['module_dir']}.etl"
 | ||
|     )
 | ||
|     etl_class = getattr(etl_module, dataset["class_name"])
 | ||
|     etl_instance = etl_class()
 | ||
| 
 | ||
|     return etl_instance
 | ||
| 
 | ||
| 
 | ||
| def _run_one_dataset(dataset: dict, use_cache: bool = False) -> None:
 | ||
|     """Runs one etl process."""
 | ||
| 
 | ||
|     logger.info(f"Running ETL for {dataset['name']}")
 | ||
|     etl_instance = _get_dataset(dataset)
 | ||
| 
 | ||
|     # run extract
 | ||
|     logger.debug(f"Extracting {dataset['name']}")
 | ||
|     etl_instance.extract(use_cache)
 | ||
| 
 | ||
|     # run transform
 | ||
|     logger.debug(f"Transforming {dataset['name']}")
 | ||
|     etl_instance.transform()
 | ||
| 
 | ||
|     # run load
 | ||
|     logger.debug(f"Loading {dataset['name']}")
 | ||
|     etl_instance.load()
 | ||
| 
 | ||
|     # run validate
 | ||
|     logger.debug(f"Validating {dataset['name']}")
 | ||
|     etl_instance.validate()
 | ||
| 
 | ||
|     # cleanup
 | ||
|     logger.debug(f"Cleaning up {dataset['name']}")
 | ||
|     etl_instance.cleanup()
 | ||
| 
 | ||
|     logger.info(f"Finished ETL for dataset {dataset['name']}")
 | ||
| 
 | ||
| 
 | ||
| def etl_runner(
 | ||
|     dataset_to_run: str = None,
 | ||
|     use_cache: bool = False,
 | ||
|     no_concurrency: bool = False,
 | ||
| ) -> None:
 | ||
|     """Runs all etl processes or a specific one
 | ||
| 
 | ||
|     Args:
 | ||
|         dataset_to_run (str): Run a specific ETL process. If missing, runs all processes (optional)
 | ||
|         use_cache (bool): Use the cached data sources – if they exist – rather than downloading them all from scratch
 | ||
| 
 | ||
|     Returns:
 | ||
|         None
 | ||
|     """
 | ||
|     dataset_list = _get_datasets_to_run(dataset_to_run)
 | ||
| 
 | ||
|     # Because we are memory constrained on our infrastructure,
 | ||
|     # we split datasets into those that are not memory intensive
 | ||
|     # (is_memory_intensive == False) and thereby can be safely
 | ||
|     # run in parallel, and those that require more RAM and thus
 | ||
|     # should be run sequentially. The is_memory_intensive_flag is
 | ||
|     # set manually in constants.py based on experience running
 | ||
|     # the pipeline
 | ||
|     concurrent_datasets = [
 | ||
|         dataset
 | ||
|         for dataset in dataset_list
 | ||
|         if not dataset["is_memory_intensive"]
 | ||
|     ]
 | ||
|     high_memory_datasets = [
 | ||
|         dataset for dataset in dataset_list if dataset["is_memory_intensive"]
 | ||
|     ]
 | ||
| 
 | ||
|     max_workers = 1 if no_concurrency else os.cpu_count()
 | ||
|     if concurrent_datasets:
 | ||
|         logger.info(f"Running concurrent ETL jobs on {max_workers} thread(s)")
 | ||
|         with concurrent.futures.ThreadPoolExecutor(
 | ||
|             max_workers=max_workers
 | ||
|         ) as executor:
 | ||
|             futures = {
 | ||
|                 executor.submit(
 | ||
|                     _run_one_dataset, dataset=dataset, use_cache=use_cache
 | ||
|                 )
 | ||
|                 for dataset in concurrent_datasets
 | ||
|             }
 | ||
| 
 | ||
|             for fut in concurrent.futures.as_completed(futures):
 | ||
|                 # Calling result will raise an exception if one occurred.
 | ||
|                 # Otherwise, the exceptions are silently ignored.
 | ||
|                 fut.result()
 | ||
| 
 | ||
|     # Note: these high-memory datasets also usually require the Census GeoJSON to be
 | ||
|     # generated, and one of them requires the Tribal GeoJSON to be generated.
 | ||
|     if high_memory_datasets:
 | ||
|         logger.info("Running high-memory ETL jobs")
 | ||
|         for dataset in high_memory_datasets:
 | ||
|             _run_one_dataset(dataset=dataset, use_cache=use_cache)
 | ||
| 
 | ||
| 
 | ||
| def get_data_sources(dataset_to_run: str = None) -> [DataSource]:
 | ||
| 
 | ||
|     dataset_list = _get_datasets_to_run(dataset_to_run)
 | ||
| 
 | ||
|     sources = []
 | ||
| 
 | ||
|     for dataset in dataset_list:
 | ||
|         etl_instance = _get_dataset(dataset)
 | ||
|         sources.append(etl_instance.get_data_sources())
 | ||
| 
 | ||
|     sources = reduce(
 | ||
|         list.__add__, sources
 | ||
|     )  # flatten the list of lists into a single list
 | ||
| 
 | ||
|     return sources
 | ||
| 
 | ||
| 
 | ||
| def extract_data_sources(
 | ||
|     dataset_to_run: str = None, use_cache: bool = False
 | ||
| ) -> None:
 | ||
| 
 | ||
|     dataset_list = _get_datasets_to_run(dataset_to_run)
 | ||
| 
 | ||
|     for dataset in dataset_list:
 | ||
|         etl_instance = _get_dataset(dataset)
 | ||
|         logger.info(
 | ||
|             f"Extracting data set for {etl_instance.__class__.__name__}"
 | ||
|         )
 | ||
|         etl_instance.extract(use_cache)
 | ||
| 
 | ||
| 
 | ||
| def clear_data_source_cache(dataset_to_run: str = None) -> None:
 | ||
| 
 | ||
|     dataset_list = _get_datasets_to_run(dataset_to_run)
 | ||
| 
 | ||
|     for dataset in dataset_list:
 | ||
|         etl_instance = _get_dataset(dataset)
 | ||
|         logger.info(
 | ||
|             f"Clearing data set cache for {etl_instance.__class__.__name__}"
 | ||
|         )
 | ||
|         etl_instance.clear_data_source_cache()
 | ||
| 
 | ||
| 
 | ||
| def score_generate() -> None:
 | ||
|     """Generates the score and saves it on the local data directory
 | ||
| 
 | ||
|     Args:
 | ||
|         None
 | ||
| 
 | ||
|     Returns:
 | ||
|         None
 | ||
|     """
 | ||
| 
 | ||
|     # Score Gen
 | ||
|     score_gen = ScoreETL()
 | ||
|     score_gen.extract()
 | ||
|     score_gen.transform()
 | ||
|     score_gen.load()
 | ||
| 
 | ||
| 
 | ||
| def score_post(data_source: str = "local") -> None:
 | ||
|     """Posts the score files to the local directory
 | ||
| 
 | ||
|     Args:
 | ||
|         data_source (str): Source for the census data (optional)
 | ||
|                            Options:
 | ||
|                            - local (default): fetch census data from the local data directory
 | ||
|                            - aws: fetch census from AWS S3 J40 data repository
 | ||
| 
 | ||
|     Returns:
 | ||
|         None
 | ||
|     """
 | ||
|     # Post Score Processing
 | ||
|     score_post = PostScoreETL(data_source=data_source)
 | ||
|     score_post.extract()
 | ||
|     score_post.transform()
 | ||
|     score_post.load()
 | ||
|     score_post.cleanup()
 | ||
| 
 | ||
| 
 | ||
| def score_geo(data_source: str = "local") -> None:
 | ||
|     """Generates the geojson files with score data baked in
 | ||
| 
 | ||
|     Args:
 | ||
|         data_source (str): Source for the census data (optional)
 | ||
|                            Options:
 | ||
|                            - local (default): fetch census data from the local data directory
 | ||
|                            - aws: fetch census from AWS S3 J40 data repository
 | ||
| 
 | ||
|     Returns:
 | ||
|         None
 | ||
|     """
 | ||
| 
 | ||
|     # Score Geo
 | ||
|     score_geo = GeoScoreETL(data_source=data_source)
 | ||
|     score_geo.extract()
 | ||
|     score_geo.transform()
 | ||
|     score_geo.load()
 | ||
| 
 | ||
| 
 | ||
| def _find_dataset_index(dataset_list, key, value):
 | ||
|     for i, element in enumerate(dataset_list):
 | ||
|         if element[key] == value:
 | ||
|             return i
 | ||
|     return -1
 |