Run ETL processes in parallel (#1253)

* WIP on parallelizing

* switching to get_tmp_path for nri

* switching to get_tmp_path everywhere necessary

* fixing linter errors

* moving heavy ETLs to front of line

* add hold

* moving cdc places up

* removing unnecessary print

* moving h&t up

* adding parallel to geo post

* better census labels

* switching to concurrent futures

* fixing output
This commit is contained in:
Lucas Merrill Brown 2022-02-11 14:04:53 -05:00 committed by GitHub
commit a0d6e55f0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 286 additions and 160 deletions

View file

@ -98,6 +98,17 @@ class ExtractTransformLoad:
)
return output_file_path
def get_tmp_path(self) -> pathlib.Path:
"""Returns the temporary path associated with this ETL class."""
# Note: the temporary path will be defined on `init`, because it uses the class
# of the instance which is often a child class.
tmp_path = self.DATA_PATH / "tmp" / str(self.__class__.__name__)
# Create directory if it doesn't exist
tmp_path.mkdir(parents=True, exist_ok=True)
return tmp_path
def extract(
self,
source_url: str = None,
@ -112,7 +123,7 @@ class ExtractTransformLoad:
if source_url and extract_path:
unzip_file_from_url(
file_url=source_url,
download_path=self.TMP_PATH,
download_path=self.get_tmp_path(),
unzipped_file_path=extract_path,
verify=verify,
)
@ -265,4 +276,4 @@ class ExtractTransformLoad:
def cleanup(self) -> None:
"""Clears out any files stored in the TMP folder"""
remove_all_from_dir(self.TMP_PATH)
remove_all_from_dir(self.get_tmp_path())

View file

@ -1,8 +1,18 @@
DATASET_LIST = [
{
"name": "mapping_for_ej",
"module_dir": "mapping_for_ej",
"class_name": "MappingForEJETL",
"name": "cdc_places",
"module_dir": "cdc_places",
"class_name": "CDCPlacesETL",
},
{
"name": "national_risk_index",
"module_dir": "national_risk_index",
"class_name": "NationalRiskIndexETL",
},
{
"name": "tree_equity_score",
"module_dir": "tree_equity_score",
"class_name": "TreeEquityScoreETL",
},
{
"name": "census_acs",
@ -14,6 +24,21 @@ DATASET_LIST = [
"module_dir": "census_acs_2010",
"class_name": "CensusACS2010ETL",
},
{
"name": "census_decennial",
"module_dir": "census_decennial",
"class_name": "CensusDecennialETL",
},
{
"name": "housing_and_transportation",
"module_dir": "housing_and_transportation",
"class_name": "HousingTransportationETL",
},
{
"name": "mapping_for_ej",
"module_dir": "mapping_for_ej",
"class_name": "MappingForEJETL",
},
{
"name": "ejscreen",
"module_dir": "ejscreen",
@ -24,16 +49,6 @@ DATASET_LIST = [
"module_dir": "hud_housing",
"class_name": "HudHousingETL",
},
{
"name": "cdc_places",
"module_dir": "cdc_places",
"class_name": "CDCPlacesETL",
},
{
"name": "national_risk_index",
"module_dir": "national_risk_index",
"class_name": "NationalRiskIndexETL",
},
{
"name": "census_acs_median_income",
"module_dir": "census_acs_median_income",
@ -74,16 +89,6 @@ DATASET_LIST = [
"module_dir": "ejscreen_areas_of_concern",
"class_name": "EJSCREENAreasOfConcernETL",
},
{
"name": "census_decennial",
"module_dir": "census_decennial",
"class_name": "CensusDecennialETL",
},
{
"name": "housing_and_transportation",
"module_dir": "housing_and_transportation",
"class_name": "HousingTransportationETL",
},
{
"name": "calenviroscreen",
"module_dir": "calenviroscreen",
@ -104,11 +109,6 @@ DATASET_LIST = [
"module_dir": "energy_definition_alternative_draft",
"class_name": "EnergyDefinitionAlternativeDraft",
},
{
"name": "tree_equity_score",
"module_dir": "tree_equity_score",
"class_name": "TreeEquityScoreETL",
},
{
"name": "michigan_ejscreen",
"module_dir": "michigan_ejscreen",

View file

@ -1,13 +1,18 @@
import importlib
import concurrent.futures
import typing
from data_pipeline.etl.score.etl_score import ScoreETL
from data_pipeline.etl.score.etl_score_geo import GeoScoreETL
from data_pipeline.etl.score.etl_score_post import PostScoreETL
from data_pipeline.utils import get_module_logger
from . import constants
logger = get_module_logger(__name__)
def get_datasets_to_run(dataset_to_run: str):
def _get_datasets_to_run(dataset_to_run: str) -> typing.List[dict]:
"""Returns a list of appropriate datasets to run given input args
Args:
@ -29,9 +34,36 @@ def get_datasets_to_run(dataset_to_run: str):
else:
# reset the list to just the dataset
dataset_list = [dataset_element]
return dataset_list
def _run_one_dataset(dataset: dict) -> None:
"""Runs one etl process."""
etl_module = importlib.import_module(
f"data_pipeline.etl.sources.{dataset['module_dir']}.etl"
)
etl_class = getattr(etl_module, dataset["class_name"])
etl_instance = etl_class()
# run extract
etl_instance.extract()
# run transform
etl_instance.transform()
# run load
etl_instance.load()
# run validate
etl_instance.validate()
# cleanup
etl_instance.cleanup()
logger.info(f"Finished `etl-run` for dataset `{dataset['name']}`.")
def etl_runner(dataset_to_run: str = None) -> None:
"""Runs all etl processes or a specific one
@ -41,33 +73,18 @@ def etl_runner(dataset_to_run: str = None) -> None:
Returns:
None
"""
dataset_list = get_datasets_to_run(dataset_to_run)
dataset_list = _get_datasets_to_run(dataset_to_run)
# Run the ETLs for the dataset_list
for dataset in dataset_list:
etl_module = importlib.import_module(
f"data_pipeline.etl.sources.{dataset['module_dir']}.etl"
)
etl_class = getattr(etl_module, dataset["class_name"])
etl_instance = etl_class()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(_run_one_dataset, dataset=dataset)
for dataset in dataset_list
}
# run extract
etl_instance.extract()
# run transform
etl_instance.transform()
# run load
etl_instance.load()
# run validate
etl_instance.validate()
# cleanup
etl_instance.cleanup()
# update the front end JSON/CSV of list of data sources
pass
for fut in concurrent.futures.as_completed(futures):
# Calling result will raise an exception if one occurred.
# Otherwise, the exceptions are silently ignored.
fut.result()
def score_generate() -> None:

View file

@ -1,4 +1,6 @@
import concurrent.futures
import math
import pandas as pd
import geopandas as gpd
@ -204,14 +206,28 @@ class GeoScoreETL(ExtractTransformLoad):
return compressed
def load(self) -> None:
logger.info("Writing usa-high (~9 minutes)")
self.geojson_score_usa_high.to_file(
self.SCORE_HIGH_GEOJSON, driver="GeoJSON"
)
logger.info("Completed writing usa-high")
# Create separate threads to run each write to disk.
def write_high_to_file():
logger.info("Writing usa-high (~9 minutes)")
self.geojson_score_usa_high.to_file(
filename=self.SCORE_HIGH_GEOJSON, driver="GeoJSON"
)
logger.info("Completed writing usa-high")
logger.info("Writing usa-low (~9 minutes)")
self.geojson_score_usa_low.to_file(
self.SCORE_LOW_GEOJSON, driver="GeoJSON"
)
logger.info("Completed writing usa-low")
def write_low_to_file():
logger.info("Writing usa-low (~9 minutes)")
self.geojson_score_usa_low.to_file(
filename=self.SCORE_LOW_GEOJSON, driver="GeoJSON"
)
logger.info("Completed writing usa-low")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(task)
for task in [write_high_to_file, write_low_to_file]
}
for fut in concurrent.futures.as_completed(futures):
# Calling result will raise an exception if one occurred.
# Otherwise, the exceptions are silently ignored.
fut.result()

View file

@ -48,7 +48,7 @@ def check_score_data_source(
# check if score data is found locally
if not os.path.isfile(TILE_SCORE_CSV):
logger.info(
"No local score tiles data found. Please use '-d aws` to fetch from AWS"
"No local score tiles data found. Please use '-s aws` to fetch from AWS"
)
sys.exit()

View file

@ -14,7 +14,7 @@ class CalEnviroScreenETL(ExtractTransformLoad):
+ "/CalEnviroScreen_4.0_2021.zip"
)
self.CALENVIROSCREEN_CSV = (
self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv"
self.get_tmp_path() / "CalEnviroScreen_4.0_2021.csv"
)
self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4"
@ -37,7 +37,7 @@ class CalEnviroScreenETL(ExtractTransformLoad):
logger.info("Downloading CalEnviroScreen Data")
super().extract(
self.CALENVIROSCREEN_FTP_URL,
self.TMP_PATH,
self.get_tmp_path(),
)
def transform(self) -> None:

View file

@ -29,7 +29,9 @@ class CDCLifeExpectancy(ExtractTransformLoad):
def extract(self) -> None:
logger.info("Starting data download.")
download_file_name = self.TMP_PATH / "cdc_life_expectancy" / "usa.csv"
download_file_name = (
self.get_tmp_path() / "cdc_life_expectancy" / "usa.csv"
)
download_file_from_url(
file_url=self.FILE_URL,
download_file_name=download_file_name,

View file

@ -22,7 +22,7 @@ class CDCPlacesETL(ExtractTransformLoad):
logger.info("Starting to download 520MB CDC Places file.")
file_path = download_file_from_url(
file_url=self.CDC_PLACES_URL,
download_file_name=self.TMP_PATH
download_file_name=self.get_tmp_path()
/ "cdc_places"
/ "census_tract.csv",
)

View file

@ -101,6 +101,7 @@ class CensusACS2010ETL(ExtractTransformLoad):
self.df: pd.DataFrame
def extract(self) -> None:
logger.info("Starting Census 2010 ACS Transform")
# Define the variables to retrieve
variables = (
self.UNEMPLOYED_FIELDS
@ -118,7 +119,7 @@ class CensusACS2010ETL(ExtractTransformLoad):
)
def transform(self) -> None:
logger.info("Starting Census ACS Transform")
logger.info("Starting Census 2010 ACS Transform")
df = self.df
@ -184,7 +185,7 @@ class CensusACS2010ETL(ExtractTransformLoad):
self.df = output_df
def load(self) -> None:
logger.info("Saving Census ACS Data")
logger.info("Saving Census 2010 ACS Data")
# mkdir census
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)

View file

@ -238,12 +238,12 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad):
unzip_file_from_url(
file_url=settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/geocorr2014_all_states_tracts_only.csv.zip",
download_path=self.TMP_PATH,
unzipped_file_path=self.TMP_PATH / "geocorr",
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "geocorr",
)
self.raw_geocorr_df = pd.read_csv(
filepath_or_buffer=self.TMP_PATH
filepath_or_buffer=self.get_tmp_path()
/ "geocorr"
/ "geocorr2014_all_states_tracts_only.csv",
# Skip second row, which has descriptions.

View file

@ -57,12 +57,12 @@ class ChildOpportunityIndex(ExtractTransformLoad):
unzip_file_from_url(
file_url=self.COI_FILE_URL,
download_path=self.TMP_PATH,
unzipped_file_path=self.TMP_PATH / "child_opportunity_index",
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "child_opportunity_index",
)
self.raw_df = pd.read_csv(
filepath_or_buffer=self.TMP_PATH
filepath_or_buffer=self.get_tmp_path()
/ "child_opportunity_index"
/ "raw.csv",
# The following need to remain as strings for all of their digits, not get

View file

@ -37,12 +37,12 @@ class DOEEnergyBurden(ExtractTransformLoad):
unzip_file_from_url(
file_url=self.DOE_FILE_URL,
download_path=self.TMP_PATH,
unzipped_file_path=self.TMP_PATH / "doe_energy_burden",
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "doe_energy_burden",
)
self.raw_df = pd.read_csv(
filepath_or_buffer=self.TMP_PATH
filepath_or_buffer=self.get_tmp_path()
/ "doe_energy_burden"
/ "DOE_LEAD_AMI_TRACT_2018_ALL.csv",
# The following need to remain as strings for all of their digits, not get converted to numbers.

View file

@ -16,7 +16,7 @@ class EJSCREENETL(ExtractTransformLoad):
def __init__(self):
self.EJSCREEN_FTP_URL = "https://edap-arcgiscloud-data-commons.s3.amazonaws.com/EJSCREEN2020/EJSCREEN_Tract_2020_USPR.csv.zip"
self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_Tract_2020_USPR.csv"
self.EJSCREEN_CSV = self.get_tmp_path() / "EJSCREEN_Tract_2020_USPR.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019"
self.df: pd.DataFrame
@ -45,7 +45,7 @@ class EJSCREENETL(ExtractTransformLoad):
logger.info("Downloading EJScreen Data")
super().extract(
self.EJSCREEN_FTP_URL,
self.TMP_PATH,
self.get_tmp_path(),
verify=False, # EPA EJScreen end point has certificate issues often
)

View file

@ -52,13 +52,13 @@ class EnergyDefinitionAlternativeDraft(ExtractTransformLoad):
unzip_file_from_url(
file_url=self.DEFINITION_ALTERNATIVE_FILE_URL,
download_path=self.TMP_PATH,
unzipped_file_path=self.TMP_PATH
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path()
/ "energy_definition_alternative_draft",
)
self.df = pd.read_csv(
filepath_or_buffer=self.TMP_PATH
filepath_or_buffer=self.get_tmp_path()
/ "energy_definition_alternative_draft"
/ "J40 alternative DAC definition.csv",
# The following need to remain as strings for all of their digits, not get converted to numbers.

View file

@ -71,12 +71,12 @@ class EPARiskScreeningEnvironmentalIndicatorsETL(ExtractTransformLoad):
unzip_file_from_url(
file_url=self.AGGREGATED_RSEI_SCORE_FILE_URL,
download_path=self.TMP_PATH,
unzipped_file_path=self.TMP_PATH / "epa_rsei",
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "epa_rsei",
)
self.df = pd.read_csv(
filepath_or_buffer=self.TMP_PATH
filepath_or_buffer=self.get_tmp_path()
/ "epa_rsei"
/ "CensusMicroTracts2019_2019_aggregated.csv",
# The following need to remain as strings for all of their digits, not get

View file

@ -34,12 +34,12 @@ class GeoCorrETL(ExtractTransformLoad):
unzip_file_from_url(
file_url=settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/geocorr_urban_rural.csv.zip",
download_path=self.TMP_PATH,
unzipped_file_path=self.TMP_PATH / "geocorr",
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "geocorr",
)
self.df = pd.read_csv(
filepath_or_buffer=self.TMP_PATH
filepath_or_buffer=self.get_tmp_path()
/ "geocorr"
/ "geocorr_urban_rural.csv",
dtype={

View file

@ -21,14 +21,16 @@ class HousingTransportationETL(ExtractTransformLoad):
def extract(self) -> None:
# Download each state / territory individually
dfs = []
zip_file_dir = self.TMP_PATH / "housing_and_transportation_index"
zip_file_dir = self.get_tmp_path() / "housing_and_transportation_index"
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(
f"Downloading housing data for state/territory with FIPS code {fips}"
)
unzip_file_from_url(
f"{self.HOUSING_FTP_URL}{fips}", self.TMP_PATH, zip_file_dir
f"{self.HOUSING_FTP_URL}{fips}",
self.get_tmp_path(),
zip_file_dir,
)
# New file name:

View file

@ -10,7 +10,7 @@ class HudHousingETL(ExtractTransformLoad):
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "hud_housing"
self.GEOID_TRACT_FIELD_NAME = "GEOID10_TRACT"
self.HOUSING_FTP_URL = "https://www.huduser.gov/portal/datasets/cp/2014thru2018-140-csv.zip"
self.HOUSING_ZIP_FILE_DIR = self.TMP_PATH / "hud_housing"
self.HOUSING_ZIP_FILE_DIR = self.get_tmp_path() / "hud_housing"
# We measure households earning less than 80% of HUD Area Median Family Income by county
# and paying greater than 30% of their income to housing costs.

View file

@ -12,7 +12,7 @@ class HudRecapETL(ExtractTransformLoad):
# pylint: disable=line-too-long
self.HUD_RECAP_CSV_URL = "https://opendata.arcgis.com/api/v3/datasets/56de4edea8264fe5a344da9811ef5d6e_0/downloads/data?format=csv&spatialRefId=4326" # noqa: E501
self.HUD_RECAP_CSV = (
self.TMP_PATH
self.get_tmp_path()
/ "Racially_or_Ethnically_Concentrated_Areas_of_Poverty__R_ECAPs_.csv"
)
self.CSV_PATH = self.DATA_PATH / "dataset" / "hud_recap"

View file

@ -19,8 +19,8 @@ class MappingForEJETL(ExtractTransformLoad):
self.MAPPING_FOR_EJ_CO_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL + "/CO_mej.zip"
)
self.VA_SHP_FILE_PATH = self.TMP_PATH / "mej_virginia_7_1.shp"
self.CO_SHP_FILE_PATH = self.TMP_PATH / "mej_colorado_final.shp"
self.VA_SHP_FILE_PATH = self.get_tmp_path() / "mej_virginia_7_1.shp"
self.CO_SHP_FILE_PATH = self.get_tmp_path() / "mej_colorado_final.shp"
# Defining variables
self.COLUMNS_TO_KEEP = [
@ -43,11 +43,11 @@ class MappingForEJETL(ExtractTransformLoad):
logger.info("Downloading Mapping for EJ Data")
super().extract(
self.MAPPING_FOR_EJ_VA_URL,
self.TMP_PATH,
self.get_tmp_path(),
)
super().extract(
self.MAPPING_FOR_EJ_CO_URL,
self.TMP_PATH,
self.get_tmp_path(),
)
def transform(self) -> None:

View file

@ -25,7 +25,9 @@ class MappingInequalityETL(ExtractTransformLoad):
"https://raw.githubusercontent.com/americanpanorama/Census_HOLC_Research/"
"main/2010_Census_Tracts/holc_tract_lookup.csv"
)
self.MAPPING_INEQUALITY_CSV = self.TMP_PATH / "holc_tract_lookup.csv"
self.MAPPING_INEQUALITY_CSV = (
self.get_tmp_path() / "holc_tract_lookup.csv"
)
self.CSV_PATH = self.DATA_PATH / "dataset" / "mapping_inequality"
self.HOLC_MANUAL_MAPPING_CSV_PATH = (

View file

@ -21,7 +21,7 @@ class MarylandEJScreenETL(ExtractTransformLoad):
settings.AWS_JUSTICE40_DATASOURCES_URL + "/MD_EJScreen.zip"
)
self.SHAPE_FILES_PATH = self.TMP_PATH / "mdejscreen"
self.SHAPE_FILES_PATH = self.get_tmp_path() / "mdejscreen"
self.OUTPUT_CSV_PATH = self.DATA_PATH / "dataset" / "maryland_ejscreen"
self.COLUMNS_TO_KEEP = [
@ -36,7 +36,7 @@ class MarylandEJScreenETL(ExtractTransformLoad):
logger.info("Downloading 207MB Maryland EJSCREEN Data")
super().extract(
self.MARYLAND_EJSCREEN_URL,
self.TMP_PATH,
self.get_tmp_path(),
)
def transform(self) -> None:

View file

@ -20,7 +20,7 @@ class NationalRiskIndexETL(ExtractTransformLoad):
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
def __init__(self):
self.INPUT_CSV = self.TMP_PATH / "NRI_Table_CensusTracts.csv"
self.INPUT_CSV = self.get_tmp_path() / "NRI_Table_CensusTracts.csv"
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = (
"EAL_SCORE"
@ -68,7 +68,7 @@ class NationalRiskIndexETL(ExtractTransformLoad):
logger.info("Downloading 405MB National Risk Index Data")
super().extract(
source_url=self.SOURCE_URL,
extract_path=self.TMP_PATH,
extract_path=self.get_tmp_path(),
)
def transform(self) -> None:

View file

@ -75,12 +75,12 @@ class PersistentPovertyETL(ExtractTransformLoad):
def extract(self) -> None:
logger.info("Starting to download 86MB persistent poverty file.")
unzipped_file_path = self.TMP_PATH / "persistent_poverty"
unzipped_file_path = self.get_tmp_path() / "persistent_poverty"
unzip_file_from_url(
file_url=settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/LTDB_Std_All_Sample.zip",
download_path=self.TMP_PATH,
download_path=self.get_tmp_path(),
unzipped_file_path=unzipped_file_path,
)
@ -93,7 +93,6 @@ class PersistentPovertyETL(ExtractTransformLoad):
temporary_input_dfs = []
for file_name in file_names:
print(file_name)
temporary_input_df = pd.read_csv(
filepath_or_buffer=unzipped_file_path
/ f"ltdb_std_all_sample/{file_name}",

View file

@ -21,7 +21,7 @@ class TreeEquityScoreETL(ExtractTransformLoad):
def __init__(self):
self.TES_URL = "https://national-tes-data-share.s3.amazonaws.com/national_tes_share/"
self.TES_CSV = self.TMP_PATH / "tes_2021_data.csv"
self.TES_CSV = self.get_tmp_path() / "tes_2021_data.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "tree_equity_score"
self.df: gpd.GeoDataFrame
self.states = [
@ -81,7 +81,7 @@ class TreeEquityScoreETL(ExtractTransformLoad):
for state in self.states:
super().extract(
f"{self.TES_URL}{state}.zip.zip",
f"{self.TMP_PATH}/{state}",
f"{self.get_tmp_path()}/{state}",
)
def transform(self) -> None:
@ -89,7 +89,7 @@ class TreeEquityScoreETL(ExtractTransformLoad):
tes_state_dfs = []
for state in self.states:
tes_state_dfs.append(
gpd.read_file(f"{self.TMP_PATH}/{state}/{state}.shp")
gpd.read_file(f"{self.get_tmp_path()}/{state}/{state}.shp")
)
self.df = gpd.GeoDataFrame(
pd.concat(tes_state_dfs), crs=tes_state_dfs[0].crs