diff --git a/data/data-pipeline/data_pipeline/etl/base.py b/data/data-pipeline/data_pipeline/etl/base.py index 58478661..be1dd86a 100644 --- a/data/data-pipeline/data_pipeline/etl/base.py +++ b/data/data-pipeline/data_pipeline/etl/base.py @@ -98,6 +98,17 @@ class ExtractTransformLoad: ) return output_file_path + def get_tmp_path(self) -> pathlib.Path: + """Returns the temporary path associated with this ETL class.""" + # Note: the temporary path will be defined on `init`, because it uses the class + # of the instance which is often a child class. + tmp_path = self.DATA_PATH / "tmp" / str(self.__class__.__name__) + + # Create directory if it doesn't exist + tmp_path.mkdir(parents=True, exist_ok=True) + + return tmp_path + def extract( self, source_url: str = None, @@ -112,7 +123,7 @@ class ExtractTransformLoad: if source_url and extract_path: unzip_file_from_url( file_url=source_url, - download_path=self.TMP_PATH, + download_path=self.get_tmp_path(), unzipped_file_path=extract_path, verify=verify, ) @@ -265,4 +276,4 @@ class ExtractTransformLoad: def cleanup(self) -> None: """Clears out any files stored in the TMP folder""" - remove_all_from_dir(self.TMP_PATH) + remove_all_from_dir(self.get_tmp_path()) diff --git a/data/data-pipeline/data_pipeline/etl/constants.py b/data/data-pipeline/data_pipeline/etl/constants.py index 9ddd1210..1dc6697c 100644 --- a/data/data-pipeline/data_pipeline/etl/constants.py +++ b/data/data-pipeline/data_pipeline/etl/constants.py @@ -1,8 +1,18 @@ DATASET_LIST = [ { - "name": "mapping_for_ej", - "module_dir": "mapping_for_ej", - "class_name": "MappingForEJETL", + "name": "cdc_places", + "module_dir": "cdc_places", + "class_name": "CDCPlacesETL", + }, + { + "name": "national_risk_index", + "module_dir": "national_risk_index", + "class_name": "NationalRiskIndexETL", + }, + { + "name": "tree_equity_score", + "module_dir": "tree_equity_score", + "class_name": "TreeEquityScoreETL", }, { "name": "census_acs", @@ -14,6 +24,21 @@ DATASET_LIST = [ "module_dir": "census_acs_2010", "class_name": "CensusACS2010ETL", }, + { + "name": "census_decennial", + "module_dir": "census_decennial", + "class_name": "CensusDecennialETL", + }, + { + "name": "housing_and_transportation", + "module_dir": "housing_and_transportation", + "class_name": "HousingTransportationETL", + }, + { + "name": "mapping_for_ej", + "module_dir": "mapping_for_ej", + "class_name": "MappingForEJETL", + }, { "name": "ejscreen", "module_dir": "ejscreen", @@ -24,16 +49,6 @@ DATASET_LIST = [ "module_dir": "hud_housing", "class_name": "HudHousingETL", }, - { - "name": "cdc_places", - "module_dir": "cdc_places", - "class_name": "CDCPlacesETL", - }, - { - "name": "national_risk_index", - "module_dir": "national_risk_index", - "class_name": "NationalRiskIndexETL", - }, { "name": "census_acs_median_income", "module_dir": "census_acs_median_income", @@ -74,16 +89,6 @@ DATASET_LIST = [ "module_dir": "ejscreen_areas_of_concern", "class_name": "EJSCREENAreasOfConcernETL", }, - { - "name": "census_decennial", - "module_dir": "census_decennial", - "class_name": "CensusDecennialETL", - }, - { - "name": "housing_and_transportation", - "module_dir": "housing_and_transportation", - "class_name": "HousingTransportationETL", - }, { "name": "calenviroscreen", "module_dir": "calenviroscreen", @@ -104,11 +109,6 @@ DATASET_LIST = [ "module_dir": "energy_definition_alternative_draft", "class_name": "EnergyDefinitionAlternativeDraft", }, - { - "name": "tree_equity_score", - "module_dir": "tree_equity_score", - "class_name": "TreeEquityScoreETL", - }, { "name": "michigan_ejscreen", "module_dir": "michigan_ejscreen", diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index 223dfb18..40710142 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -1,13 +1,18 @@ import importlib +import concurrent.futures +import typing from data_pipeline.etl.score.etl_score import ScoreETL from data_pipeline.etl.score.etl_score_geo import GeoScoreETL from data_pipeline.etl.score.etl_score_post import PostScoreETL +from data_pipeline.utils import get_module_logger from . import constants +logger = get_module_logger(__name__) -def get_datasets_to_run(dataset_to_run: str): + +def _get_datasets_to_run(dataset_to_run: str) -> typing.List[dict]: """Returns a list of appropriate datasets to run given input args Args: @@ -29,9 +34,36 @@ def get_datasets_to_run(dataset_to_run: str): else: # reset the list to just the dataset dataset_list = [dataset_element] + return dataset_list +def _run_one_dataset(dataset: dict) -> None: + """Runs one etl process.""" + etl_module = importlib.import_module( + f"data_pipeline.etl.sources.{dataset['module_dir']}.etl" + ) + etl_class = getattr(etl_module, dataset["class_name"]) + etl_instance = etl_class() + + # run extract + etl_instance.extract() + + # run transform + etl_instance.transform() + + # run load + etl_instance.load() + + # run validate + etl_instance.validate() + + # cleanup + etl_instance.cleanup() + + logger.info(f"Finished `etl-run` for dataset `{dataset['name']}`.") + + def etl_runner(dataset_to_run: str = None) -> None: """Runs all etl processes or a specific one @@ -41,33 +73,18 @@ def etl_runner(dataset_to_run: str = None) -> None: Returns: None """ - dataset_list = get_datasets_to_run(dataset_to_run) + dataset_list = _get_datasets_to_run(dataset_to_run) - # Run the ETLs for the dataset_list - for dataset in dataset_list: - etl_module = importlib.import_module( - f"data_pipeline.etl.sources.{dataset['module_dir']}.etl" - ) - etl_class = getattr(etl_module, dataset["class_name"]) - etl_instance = etl_class() + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit(_run_one_dataset, dataset=dataset) + for dataset in dataset_list + } - # run extract - etl_instance.extract() - - # run transform - etl_instance.transform() - - # run load - etl_instance.load() - - # run validate - etl_instance.validate() - - # cleanup - etl_instance.cleanup() - - # update the front end JSON/CSV of list of data sources - pass + for fut in concurrent.futures.as_completed(futures): + # Calling result will raise an exception if one occurred. + # Otherwise, the exceptions are silently ignored. + fut.result() def score_generate() -> None: diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py b/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py index 4ac543c3..b2740b39 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py @@ -1,4 +1,6 @@ +import concurrent.futures import math + import pandas as pd import geopandas as gpd @@ -204,14 +206,28 @@ class GeoScoreETL(ExtractTransformLoad): return compressed def load(self) -> None: - logger.info("Writing usa-high (~9 minutes)") - self.geojson_score_usa_high.to_file( - self.SCORE_HIGH_GEOJSON, driver="GeoJSON" - ) - logger.info("Completed writing usa-high") + # Create separate threads to run each write to disk. + def write_high_to_file(): + logger.info("Writing usa-high (~9 minutes)") + self.geojson_score_usa_high.to_file( + filename=self.SCORE_HIGH_GEOJSON, driver="GeoJSON" + ) + logger.info("Completed writing usa-high") - logger.info("Writing usa-low (~9 minutes)") - self.geojson_score_usa_low.to_file( - self.SCORE_LOW_GEOJSON, driver="GeoJSON" - ) - logger.info("Completed writing usa-low") + def write_low_to_file(): + logger.info("Writing usa-low (~9 minutes)") + self.geojson_score_usa_low.to_file( + filename=self.SCORE_LOW_GEOJSON, driver="GeoJSON" + ) + logger.info("Completed writing usa-low") + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit(task) + for task in [write_high_to_file, write_low_to_file] + } + + for fut in concurrent.futures.as_completed(futures): + # Calling result will raise an exception if one occurred. + # Otherwise, the exceptions are silently ignored. + fut.result() diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_utils.py b/data/data-pipeline/data_pipeline/etl/score/etl_utils.py index 2219ab18..d896f3e0 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_utils.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_utils.py @@ -48,7 +48,7 @@ def check_score_data_source( # check if score data is found locally if not os.path.isfile(TILE_SCORE_CSV): logger.info( - "No local score tiles data found. Please use '-d aws` to fetch from AWS" + "No local score tiles data found. Please use '-s aws` to fetch from AWS" ) sys.exit() diff --git a/data/data-pipeline/data_pipeline/etl/sources/calenviroscreen/etl.py b/data/data-pipeline/data_pipeline/etl/sources/calenviroscreen/etl.py index 57649bbe..55e94372 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/calenviroscreen/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/calenviroscreen/etl.py @@ -14,7 +14,7 @@ class CalEnviroScreenETL(ExtractTransformLoad): + "/CalEnviroScreen_4.0_2021.zip" ) self.CALENVIROSCREEN_CSV = ( - self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv" + self.get_tmp_path() / "CalEnviroScreen_4.0_2021.csv" ) self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4" @@ -37,7 +37,7 @@ class CalEnviroScreenETL(ExtractTransformLoad): logger.info("Downloading CalEnviroScreen Data") super().extract( self.CALENVIROSCREEN_FTP_URL, - self.TMP_PATH, + self.get_tmp_path(), ) def transform(self) -> None: diff --git a/data/data-pipeline/data_pipeline/etl/sources/cdc_life_expectancy/etl.py b/data/data-pipeline/data_pipeline/etl/sources/cdc_life_expectancy/etl.py index 5e1f08eb..2aac7412 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/cdc_life_expectancy/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/cdc_life_expectancy/etl.py @@ -29,7 +29,9 @@ class CDCLifeExpectancy(ExtractTransformLoad): def extract(self) -> None: logger.info("Starting data download.") - download_file_name = self.TMP_PATH / "cdc_life_expectancy" / "usa.csv" + download_file_name = ( + self.get_tmp_path() / "cdc_life_expectancy" / "usa.csv" + ) download_file_from_url( file_url=self.FILE_URL, download_file_name=download_file_name, diff --git a/data/data-pipeline/data_pipeline/etl/sources/cdc_places/etl.py b/data/data-pipeline/data_pipeline/etl/sources/cdc_places/etl.py index c5e2b236..bc53758d 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/cdc_places/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/cdc_places/etl.py @@ -22,7 +22,7 @@ class CDCPlacesETL(ExtractTransformLoad): logger.info("Starting to download 520MB CDC Places file.") file_path = download_file_from_url( file_url=self.CDC_PLACES_URL, - download_file_name=self.TMP_PATH + download_file_name=self.get_tmp_path() / "cdc_places" / "census_tract.csv", ) diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs_2010/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs_2010/etl.py index 1085245d..97b0f16f 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census_acs_2010/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census_acs_2010/etl.py @@ -101,6 +101,7 @@ class CensusACS2010ETL(ExtractTransformLoad): self.df: pd.DataFrame def extract(self) -> None: + logger.info("Starting Census 2010 ACS Transform") # Define the variables to retrieve variables = ( self.UNEMPLOYED_FIELDS @@ -118,7 +119,7 @@ class CensusACS2010ETL(ExtractTransformLoad): ) def transform(self) -> None: - logger.info("Starting Census ACS Transform") + logger.info("Starting Census 2010 ACS Transform") df = self.df @@ -184,7 +185,7 @@ class CensusACS2010ETL(ExtractTransformLoad): self.df = output_df def load(self) -> None: - logger.info("Saving Census ACS Data") + logger.info("Saving Census 2010 ACS Data") # mkdir census self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True) diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py index 9699f23c..1e354c5c 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py @@ -238,12 +238,12 @@ class CensusACSMedianIncomeETL(ExtractTransformLoad): unzip_file_from_url( file_url=settings.AWS_JUSTICE40_DATASOURCES_URL + "/geocorr2014_all_states_tracts_only.csv.zip", - download_path=self.TMP_PATH, - unzipped_file_path=self.TMP_PATH / "geocorr", + download_path=self.get_tmp_path(), + unzipped_file_path=self.get_tmp_path() / "geocorr", ) self.raw_geocorr_df = pd.read_csv( - filepath_or_buffer=self.TMP_PATH + filepath_or_buffer=self.get_tmp_path() / "geocorr" / "geocorr2014_all_states_tracts_only.csv", # Skip second row, which has descriptions. diff --git a/data/data-pipeline/data_pipeline/etl/sources/child_opportunity_index/etl.py b/data/data-pipeline/data_pipeline/etl/sources/child_opportunity_index/etl.py index 6fbb17e4..eb9de9db 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/child_opportunity_index/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/child_opportunity_index/etl.py @@ -57,12 +57,12 @@ class ChildOpportunityIndex(ExtractTransformLoad): unzip_file_from_url( file_url=self.COI_FILE_URL, - download_path=self.TMP_PATH, - unzipped_file_path=self.TMP_PATH / "child_opportunity_index", + download_path=self.get_tmp_path(), + unzipped_file_path=self.get_tmp_path() / "child_opportunity_index", ) self.raw_df = pd.read_csv( - filepath_or_buffer=self.TMP_PATH + filepath_or_buffer=self.get_tmp_path() / "child_opportunity_index" / "raw.csv", # The following need to remain as strings for all of their digits, not get diff --git a/data/data-pipeline/data_pipeline/etl/sources/doe_energy_burden/etl.py b/data/data-pipeline/data_pipeline/etl/sources/doe_energy_burden/etl.py index 03e508a3..80407d39 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/doe_energy_burden/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/doe_energy_burden/etl.py @@ -37,12 +37,12 @@ class DOEEnergyBurden(ExtractTransformLoad): unzip_file_from_url( file_url=self.DOE_FILE_URL, - download_path=self.TMP_PATH, - unzipped_file_path=self.TMP_PATH / "doe_energy_burden", + download_path=self.get_tmp_path(), + unzipped_file_path=self.get_tmp_path() / "doe_energy_burden", ) self.raw_df = pd.read_csv( - filepath_or_buffer=self.TMP_PATH + filepath_or_buffer=self.get_tmp_path() / "doe_energy_burden" / "DOE_LEAD_AMI_TRACT_2018_ALL.csv", # The following need to remain as strings for all of their digits, not get converted to numbers. diff --git a/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py b/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py index da88ea48..683dbcd2 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/ejscreen/etl.py @@ -16,7 +16,7 @@ class EJSCREENETL(ExtractTransformLoad): def __init__(self): self.EJSCREEN_FTP_URL = "https://edap-arcgiscloud-data-commons.s3.amazonaws.com/EJSCREEN2020/EJSCREEN_Tract_2020_USPR.csv.zip" - self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_Tract_2020_USPR.csv" + self.EJSCREEN_CSV = self.get_tmp_path() / "EJSCREEN_Tract_2020_USPR.csv" self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019" self.df: pd.DataFrame @@ -45,7 +45,7 @@ class EJSCREENETL(ExtractTransformLoad): logger.info("Downloading EJScreen Data") super().extract( self.EJSCREEN_FTP_URL, - self.TMP_PATH, + self.get_tmp_path(), verify=False, # EPA EJScreen end point has certificate issues often ) diff --git a/data/data-pipeline/data_pipeline/etl/sources/energy_definition_alternative_draft/etl.py b/data/data-pipeline/data_pipeline/etl/sources/energy_definition_alternative_draft/etl.py index 4031bfaa..7007fa52 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/energy_definition_alternative_draft/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/energy_definition_alternative_draft/etl.py @@ -52,13 +52,13 @@ class EnergyDefinitionAlternativeDraft(ExtractTransformLoad): unzip_file_from_url( file_url=self.DEFINITION_ALTERNATIVE_FILE_URL, - download_path=self.TMP_PATH, - unzipped_file_path=self.TMP_PATH + download_path=self.get_tmp_path(), + unzipped_file_path=self.get_tmp_path() / "energy_definition_alternative_draft", ) self.df = pd.read_csv( - filepath_or_buffer=self.TMP_PATH + filepath_or_buffer=self.get_tmp_path() / "energy_definition_alternative_draft" / "J40 alternative DAC definition.csv", # The following need to remain as strings for all of their digits, not get converted to numbers. diff --git a/data/data-pipeline/data_pipeline/etl/sources/epa_rsei/etl.py b/data/data-pipeline/data_pipeline/etl/sources/epa_rsei/etl.py index 51ab54bc..abc2165f 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/epa_rsei/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/epa_rsei/etl.py @@ -71,12 +71,12 @@ class EPARiskScreeningEnvironmentalIndicatorsETL(ExtractTransformLoad): unzip_file_from_url( file_url=self.AGGREGATED_RSEI_SCORE_FILE_URL, - download_path=self.TMP_PATH, - unzipped_file_path=self.TMP_PATH / "epa_rsei", + download_path=self.get_tmp_path(), + unzipped_file_path=self.get_tmp_path() / "epa_rsei", ) self.df = pd.read_csv( - filepath_or_buffer=self.TMP_PATH + filepath_or_buffer=self.get_tmp_path() / "epa_rsei" / "CensusMicroTracts2019_2019_aggregated.csv", # The following need to remain as strings for all of their digits, not get diff --git a/data/data-pipeline/data_pipeline/etl/sources/geocorr/etl.py b/data/data-pipeline/data_pipeline/etl/sources/geocorr/etl.py index ebdc390e..ed088cae 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/geocorr/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/geocorr/etl.py @@ -34,12 +34,12 @@ class GeoCorrETL(ExtractTransformLoad): unzip_file_from_url( file_url=settings.AWS_JUSTICE40_DATASOURCES_URL + "/geocorr_urban_rural.csv.zip", - download_path=self.TMP_PATH, - unzipped_file_path=self.TMP_PATH / "geocorr", + download_path=self.get_tmp_path(), + unzipped_file_path=self.get_tmp_path() / "geocorr", ) self.df = pd.read_csv( - filepath_or_buffer=self.TMP_PATH + filepath_or_buffer=self.get_tmp_path() / "geocorr" / "geocorr_urban_rural.csv", dtype={ diff --git a/data/data-pipeline/data_pipeline/etl/sources/housing_and_transportation/etl.py b/data/data-pipeline/data_pipeline/etl/sources/housing_and_transportation/etl.py index 6bd2368e..5a560e26 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/housing_and_transportation/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/housing_and_transportation/etl.py @@ -21,14 +21,16 @@ class HousingTransportationETL(ExtractTransformLoad): def extract(self) -> None: # Download each state / territory individually dfs = [] - zip_file_dir = self.TMP_PATH / "housing_and_transportation_index" + zip_file_dir = self.get_tmp_path() / "housing_and_transportation_index" for fips in get_state_fips_codes(self.DATA_PATH): logger.info( f"Downloading housing data for state/territory with FIPS code {fips}" ) unzip_file_from_url( - f"{self.HOUSING_FTP_URL}{fips}", self.TMP_PATH, zip_file_dir + f"{self.HOUSING_FTP_URL}{fips}", + self.get_tmp_path(), + zip_file_dir, ) # New file name: diff --git a/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py b/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py index 37142bb4..6cc5b2c2 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py @@ -10,7 +10,7 @@ class HudHousingETL(ExtractTransformLoad): self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "hud_housing" self.GEOID_TRACT_FIELD_NAME = "GEOID10_TRACT" self.HOUSING_FTP_URL = "https://www.huduser.gov/portal/datasets/cp/2014thru2018-140-csv.zip" - self.HOUSING_ZIP_FILE_DIR = self.TMP_PATH / "hud_housing" + self.HOUSING_ZIP_FILE_DIR = self.get_tmp_path() / "hud_housing" # We measure households earning less than 80% of HUD Area Median Family Income by county # and paying greater than 30% of their income to housing costs. diff --git a/data/data-pipeline/data_pipeline/etl/sources/hud_recap/etl.py b/data/data-pipeline/data_pipeline/etl/sources/hud_recap/etl.py index 04fdbf70..c5f6ce63 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/hud_recap/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/hud_recap/etl.py @@ -12,7 +12,7 @@ class HudRecapETL(ExtractTransformLoad): # pylint: disable=line-too-long self.HUD_RECAP_CSV_URL = "https://opendata.arcgis.com/api/v3/datasets/56de4edea8264fe5a344da9811ef5d6e_0/downloads/data?format=csv&spatialRefId=4326" # noqa: E501 self.HUD_RECAP_CSV = ( - self.TMP_PATH + self.get_tmp_path() / "Racially_or_Ethnically_Concentrated_Areas_of_Poverty__R_ECAPs_.csv" ) self.CSV_PATH = self.DATA_PATH / "dataset" / "hud_recap" diff --git a/data/data-pipeline/data_pipeline/etl/sources/mapping_for_ej/etl.py b/data/data-pipeline/data_pipeline/etl/sources/mapping_for_ej/etl.py index ff49ea0d..5b0f2a39 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/mapping_for_ej/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/mapping_for_ej/etl.py @@ -19,8 +19,8 @@ class MappingForEJETL(ExtractTransformLoad): self.MAPPING_FOR_EJ_CO_URL = ( settings.AWS_JUSTICE40_DATASOURCES_URL + "/CO_mej.zip" ) - self.VA_SHP_FILE_PATH = self.TMP_PATH / "mej_virginia_7_1.shp" - self.CO_SHP_FILE_PATH = self.TMP_PATH / "mej_colorado_final.shp" + self.VA_SHP_FILE_PATH = self.get_tmp_path() / "mej_virginia_7_1.shp" + self.CO_SHP_FILE_PATH = self.get_tmp_path() / "mej_colorado_final.shp" # Defining variables self.COLUMNS_TO_KEEP = [ @@ -43,11 +43,11 @@ class MappingForEJETL(ExtractTransformLoad): logger.info("Downloading Mapping for EJ Data") super().extract( self.MAPPING_FOR_EJ_VA_URL, - self.TMP_PATH, + self.get_tmp_path(), ) super().extract( self.MAPPING_FOR_EJ_CO_URL, - self.TMP_PATH, + self.get_tmp_path(), ) def transform(self) -> None: diff --git a/data/data-pipeline/data_pipeline/etl/sources/mapping_inequality/etl.py b/data/data-pipeline/data_pipeline/etl/sources/mapping_inequality/etl.py index 732ab594..ea2f8152 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/mapping_inequality/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/mapping_inequality/etl.py @@ -25,7 +25,9 @@ class MappingInequalityETL(ExtractTransformLoad): "https://raw.githubusercontent.com/americanpanorama/Census_HOLC_Research/" "main/2010_Census_Tracts/holc_tract_lookup.csv" ) - self.MAPPING_INEQUALITY_CSV = self.TMP_PATH / "holc_tract_lookup.csv" + self.MAPPING_INEQUALITY_CSV = ( + self.get_tmp_path() / "holc_tract_lookup.csv" + ) self.CSV_PATH = self.DATA_PATH / "dataset" / "mapping_inequality" self.HOLC_MANUAL_MAPPING_CSV_PATH = ( diff --git a/data/data-pipeline/data_pipeline/etl/sources/maryland_ejscreen/etl.py b/data/data-pipeline/data_pipeline/etl/sources/maryland_ejscreen/etl.py index 3755b0d1..5f7cff17 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/maryland_ejscreen/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/maryland_ejscreen/etl.py @@ -21,7 +21,7 @@ class MarylandEJScreenETL(ExtractTransformLoad): settings.AWS_JUSTICE40_DATASOURCES_URL + "/MD_EJScreen.zip" ) - self.SHAPE_FILES_PATH = self.TMP_PATH / "mdejscreen" + self.SHAPE_FILES_PATH = self.get_tmp_path() / "mdejscreen" self.OUTPUT_CSV_PATH = self.DATA_PATH / "dataset" / "maryland_ejscreen" self.COLUMNS_TO_KEEP = [ @@ -36,7 +36,7 @@ class MarylandEJScreenETL(ExtractTransformLoad): logger.info("Downloading 207MB Maryland EJSCREEN Data") super().extract( self.MARYLAND_EJSCREEN_URL, - self.TMP_PATH, + self.get_tmp_path(), ) def transform(self) -> None: diff --git a/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py b/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py index cbc78ee9..4876974d 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/national_risk_index/etl.py @@ -20,7 +20,7 @@ class NationalRiskIndexETL(ExtractTransformLoad): GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT def __init__(self): - self.INPUT_CSV = self.TMP_PATH / "NRI_Table_CensusTracts.csv" + self.INPUT_CSV = self.get_tmp_path() / "NRI_Table_CensusTracts.csv" self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = ( "EAL_SCORE" @@ -68,7 +68,7 @@ class NationalRiskIndexETL(ExtractTransformLoad): logger.info("Downloading 405MB National Risk Index Data") super().extract( source_url=self.SOURCE_URL, - extract_path=self.TMP_PATH, + extract_path=self.get_tmp_path(), ) def transform(self) -> None: diff --git a/data/data-pipeline/data_pipeline/etl/sources/persistent_poverty/etl.py b/data/data-pipeline/data_pipeline/etl/sources/persistent_poverty/etl.py index 2007d504..44d35b4e 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/persistent_poverty/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/persistent_poverty/etl.py @@ -75,12 +75,12 @@ class PersistentPovertyETL(ExtractTransformLoad): def extract(self) -> None: logger.info("Starting to download 86MB persistent poverty file.") - unzipped_file_path = self.TMP_PATH / "persistent_poverty" + unzipped_file_path = self.get_tmp_path() / "persistent_poverty" unzip_file_from_url( file_url=settings.AWS_JUSTICE40_DATASOURCES_URL + "/LTDB_Std_All_Sample.zip", - download_path=self.TMP_PATH, + download_path=self.get_tmp_path(), unzipped_file_path=unzipped_file_path, ) @@ -93,7 +93,6 @@ class PersistentPovertyETL(ExtractTransformLoad): temporary_input_dfs = [] for file_name in file_names: - print(file_name) temporary_input_df = pd.read_csv( filepath_or_buffer=unzipped_file_path / f"ltdb_std_all_sample/{file_name}", diff --git a/data/data-pipeline/data_pipeline/etl/sources/tree_equity_score/etl.py b/data/data-pipeline/data_pipeline/etl/sources/tree_equity_score/etl.py index ff364f0f..a6b89218 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/tree_equity_score/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/tree_equity_score/etl.py @@ -21,7 +21,7 @@ class TreeEquityScoreETL(ExtractTransformLoad): def __init__(self): self.TES_URL = "https://national-tes-data-share.s3.amazonaws.com/national_tes_share/" - self.TES_CSV = self.TMP_PATH / "tes_2021_data.csv" + self.TES_CSV = self.get_tmp_path() / "tes_2021_data.csv" self.CSV_PATH = self.DATA_PATH / "dataset" / "tree_equity_score" self.df: gpd.GeoDataFrame self.states = [ @@ -81,7 +81,7 @@ class TreeEquityScoreETL(ExtractTransformLoad): for state in self.states: super().extract( f"{self.TES_URL}{state}.zip.zip", - f"{self.TMP_PATH}/{state}", + f"{self.get_tmp_path()}/{state}", ) def transform(self) -> None: @@ -89,7 +89,7 @@ class TreeEquityScoreETL(ExtractTransformLoad): tes_state_dfs = [] for state in self.states: tes_state_dfs.append( - gpd.read_file(f"{self.TMP_PATH}/{state}/{state}.shp") + gpd.read_file(f"{self.get_tmp_path()}/{state}/{state}.shp") ) self.df = gpd.GeoDataFrame( pd.concat(tes_state_dfs), crs=tes_state_dfs[0].crs diff --git a/data/data-pipeline/data_pipeline/ipython/scoring_comparison.ipynb b/data/data-pipeline/data_pipeline/ipython/scoring_comparison.ipynb index 48f0cc69..ddf05c3e 100644 --- a/data/data-pipeline/data_pipeline/ipython/scoring_comparison.ipynb +++ b/data/data-pipeline/data_pipeline/ipython/scoring_comparison.ipynb @@ -2,12 +2,33 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "71c4acd0", "metadata": { "scrolled": true }, - "outputs": [], + "outputs": [ + { + "ename": "ModuleNotFoundError", + "evalue": "No module named 'lab_black'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mModuleNotFoundError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/var/folders/lx/xmq8p65j71v9xq2bhsd2j5w40000gp/T/ipykernel_29987/670980058.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 33\u001b[0m \u001b[0;32mfrom\u001b[0m \u001b[0mdata_pipeline\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mscore\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mfield_names\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 34\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 35\u001b[0;31m \u001b[0mget_ipython\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrun_line_magic\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'load_ext'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'lab_black'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 36\u001b[0m \u001b[0;31m# Turn on TQDM for pandas so that we can have progress bars when running `apply`.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 37\u001b[0m \u001b[0mtqdm_notebook\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpandas\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/.virtualenvs/scoring2/lib/python3.9/site-packages/IPython/core/interactiveshell.py\u001b[0m in \u001b[0;36mrun_line_magic\u001b[0;34m(self, magic_name, line, _stack_depth)\u001b[0m\n\u001b[1;32m 2349\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'local_ns'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget_local_scope\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mstack_depth\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2350\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mbuiltin_trap\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 2351\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 2352\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2353\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/.virtualenvs/scoring2/lib/python3.9/site-packages/decorator.py\u001b[0m in \u001b[0;36mfun\u001b[0;34m(*args, **kw)\u001b[0m\n\u001b[1;32m 230\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mkwsyntax\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 231\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkw\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfix\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkw\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msig\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 232\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mcaller\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mextras\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 233\u001b[0m \u001b[0mfun\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__name__\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__name__\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 234\u001b[0m \u001b[0mfun\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__doc__\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__doc__\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/.virtualenvs/scoring2/lib/python3.9/site-packages/IPython/core/magic.py\u001b[0m in \u001b[0;36m\u001b[0;34m(f, *a, **k)\u001b[0m\n\u001b[1;32m 185\u001b[0m \u001b[0;31m# but it's overkill for just that one bit of state.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 186\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mmagic_deco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0marg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 187\u001b[0;31m \u001b[0mcall\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mlambda\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mk\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mk\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 188\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 189\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mcallable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0marg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/.virtualenvs/scoring2/lib/python3.9/site-packages/IPython/core/magics/extension.py\u001b[0m in \u001b[0;36mload_ext\u001b[0;34m(self, module_str)\u001b[0m\n\u001b[1;32m 31\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mmodule_str\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 32\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mUsageError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'Missing module name.'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 33\u001b[0;31m \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mshell\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mextension_manager\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mload_extension\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmodule_str\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 34\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 35\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mres\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;34m'already loaded'\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/.virtualenvs/scoring2/lib/python3.9/site-packages/IPython/core/extensions.py\u001b[0m in \u001b[0;36mload_extension\u001b[0;34m(self, module_str)\u001b[0m\n\u001b[1;32m 78\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mmodule_str\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32min\u001b[0m \u001b[0msys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmodules\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 79\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mprepended_to_syspath\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mipython_extension_dir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 80\u001b[0;31m \u001b[0mmod\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mimport_module\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmodule_str\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 81\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mmod\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__file__\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mipython_extension_dir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 82\u001b[0m print((\"Loading extensions from {dir} is deprecated. \"\n", + "\u001b[0;32m~/.pyenv/versions/3.9.6/lib/python3.9/importlib/__init__.py\u001b[0m in \u001b[0;36mimport_module\u001b[0;34m(name, package)\u001b[0m\n\u001b[1;32m 125\u001b[0m \u001b[0;32mbreak\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 126\u001b[0m \u001b[0mlevel\u001b[0m \u001b[0;34m+=\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 127\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0m_bootstrap\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_gcd_import\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mname\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mlevel\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpackage\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mlevel\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 128\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 129\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/.pyenv/versions/3.9.6/lib/python3.9/importlib/_bootstrap.py\u001b[0m in \u001b[0;36m_gcd_import\u001b[0;34m(name, package, level)\u001b[0m\n", + "\u001b[0;32m~/.pyenv/versions/3.9.6/lib/python3.9/importlib/_bootstrap.py\u001b[0m in \u001b[0;36m_find_and_load\u001b[0;34m(name, import_)\u001b[0m\n", + "\u001b[0;32m~/.pyenv/versions/3.9.6/lib/python3.9/importlib/_bootstrap.py\u001b[0m in \u001b[0;36m_find_and_load_unlocked\u001b[0;34m(name, import_)\u001b[0m\n", + "\u001b[0;31mModuleNotFoundError\u001b[0m: No module named 'lab_black'" + ] + } + ], "source": [ "import collections\n", "import functools\n", @@ -102,7 +123,9 @@ "# Create the state ID by taking the first two digits of the FIPS CODE of the tract.\n", "# For more information, see https://www.census.gov/programs-surveys/geography/guidance/geo-identifiers.html.\n", "cejst_df.loc[:, GEOID_STATE_FIELD_NAME] = (\n", - " cejst_df.loc[:, ExtractTransformLoad.GEOID_TRACT_FIELD_NAME].astype(str).str[0:2]\n", + " cejst_df.loc[:, ExtractTransformLoad.GEOID_TRACT_FIELD_NAME]\n", + " .astype(str)\n", + " .str[0:2]\n", ")\n", "\n", "cejst_df.head()" @@ -174,7 +197,7 @@ "source": [ "# Analyze one field at a time (useful for setting thresholds)\n", "\n", - "quantile = 0.95\n", + "quantile = 0.90\n", "\n", "for field in [\n", " field_names.COLLEGE_ATTENDANCE_FIELD,\n", @@ -207,16 +230,18 @@ "CALENVIROSCREEN_PERCENTILE_FIELD = \"calenviroscreen_percentile\"\n", "CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD = \"calenviroscreen_priority_community\"\n", "\n", - "calenviroscreen_data_path = DATA_DIR / \"dataset\" / \"calenviroscreen4\" / \"data06.csv\"\n", + "calenviroscreen_data_path = (\n", + " DATA_DIR / \"dataset\" / \"calenviroscreen4\" / \"data06.csv\"\n", + ")\n", "calenviroscreen_df = pd.read_csv(\n", " calenviroscreen_data_path,\n", " dtype={ExtractTransformLoad.GEOID_TRACT_FIELD_NAME: \"string\"},\n", ")\n", "\n", "# Convert priority community field to a bool.\n", - "calenviroscreen_df[CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD] = calenviroscreen_df[\n", + "calenviroscreen_df[\n", " CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD\n", - "].astype(bool)\n", + "] = calenviroscreen_df[CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD].astype(bool)\n", "\n", "calenviroscreen_df.head()" ] @@ -245,7 +270,7 @@ { "cell_type": "code", "execution_count": null, - "id": "b1ac2854-80c8-42a8-85e8-84c5684bbe43", + "id": "891b5bfc", "metadata": {}, "outputs": [], "source": [ @@ -271,7 +296,9 @@ "outputs": [], "source": [ "# Load persistent poverty data\n", - "persistent_poverty_path = DATA_DIR / \"dataset\" / \"persistent_poverty\" / \"usa.csv\"\n", + "persistent_poverty_path = (\n", + " DATA_DIR / \"dataset\" / \"persistent_poverty\" / \"usa.csv\"\n", + ")\n", "persistent_poverty_df = pd.read_csv(\n", " persistent_poverty_path,\n", " dtype={ExtractTransformLoad.GEOID_TRACT_FIELD_NAME: \"string\"},\n", @@ -284,7 +311,9 @@ "PERSISTENT_POVERTY_CBG_LEVEL_FIELD = \"Persistent Poverty Census Tract\"\n", "\n", "persistent_poverty_df.rename(\n", - " columns={PERSISTENT_POVERTY_CBG_LEVEL_FIELD: PERSISTENT_POVERTY_TRACT_LEVEL_FIELD},\n", + " columns={\n", + " PERSISTENT_POVERTY_CBG_LEVEL_FIELD: PERSISTENT_POVERTY_TRACT_LEVEL_FIELD\n", + " },\n", " inplace=True,\n", " errors=\"raise\",\n", ")\n", @@ -305,7 +334,9 @@ " field_names.HOLC_GRADE_D_TRACT_50_PERCENT_FIELD,\n", " field_names.HOLC_GRADE_D_TRACT_75_PERCENT_FIELD,\n", "]\n", - "mapping_inequality_path = DATA_DIR / \"dataset\" / \"mapping_inequality\" / \"usa.csv\"\n", + "mapping_inequality_path = (\n", + " DATA_DIR / \"dataset\" / \"mapping_inequality\" / \"usa.csv\"\n", + ")\n", "mapping_inequality_df = pd.read_csv(\n", " mapping_inequality_path,\n", " dtype={ExtractTransformLoad.GEOID_TRACT_FIELD_NAME: \"string\"},\n", @@ -436,7 +467,9 @@ " census_tract_dfs,\n", ")\n", "\n", - "tract_values = merged_df[ExtractTransformLoad.GEOID_TRACT_FIELD_NAME].str.len().unique()\n", + "tract_values = (\n", + " merged_df[ExtractTransformLoad.GEOID_TRACT_FIELD_NAME].str.len().unique()\n", + ")\n", "if any(tract_values != [11]):\n", " print(tract_values)\n", " raise ValueError(\"Some of the census tract data has the wrong length.\")\n", @@ -728,13 +761,13 @@ " summary_dict[\"Geography name\"] = summary_dict[\"Urban vs Rural\"]\n", "\n", " for priority_communities_field in priority_communities_fields:\n", - " summary_dict[f\"{priority_communities_field}{POPULATION_SUFFIX}\"] = frame[\n", + " summary_dict[\n", " f\"{priority_communities_field}{POPULATION_SUFFIX}\"\n", - " ].sum()\n", + " ] = frame[f\"{priority_communities_field}{POPULATION_SUFFIX}\"].sum()\n", "\n", - " summary_dict[f\"{priority_communities_field} (total tracts)\"] = frame[\n", - " f\"{priority_communities_field}\"\n", - " ].sum()\n", + " summary_dict[\n", + " f\"{priority_communities_field} (total tracts)\"\n", + " ] = frame[f\"{priority_communities_field}\"].sum()\n", "\n", " # Calculate some combinations of other variables.\n", " summary_dict[f\"{priority_communities_field} (percent tracts)\"] = (\n", @@ -742,7 +775,9 @@ " / total_tracts_in_geography\n", " )\n", "\n", - " summary_dict[f\"{priority_communities_field} (percent population)\"] = (\n", + " summary_dict[\n", + " f\"{priority_communities_field} (percent population)\"\n", + " ] = (\n", " summary_dict[f\"{priority_communities_field}{POPULATION_SUFFIX}\"]\n", " / total_population_in_geography\n", " )\n", @@ -788,7 +823,9 @@ "\n", " # Run the comparison function on the groups.\n", " region_distribution_df = region_grouped_df.progress_apply(\n", - " lambda frame: calculate_state_comparison(frame, geography_field=\"region\")\n", + " lambda frame: calculate_state_comparison(\n", + " frame, geography_field=\"region\"\n", + " )\n", " )\n", "\n", " # Next, run the comparison by division\n", @@ -796,7 +833,9 @@ "\n", " # Run the comparison function on the groups.\n", " division_distribution_df = division_grouped_df.progress_apply(\n", - " lambda frame: calculate_state_comparison(frame, geography_field=\"division\")\n", + " lambda frame: calculate_state_comparison(\n", + " frame, geography_field=\"division\"\n", + " )\n", " )\n", "\n", " # Next, run the comparison by urban/rural\n", @@ -851,7 +890,9 @@ " column_character = get_excel_column_name(column_index)\n", "\n", " # Set all columns to larger width\n", - " worksheet.set_column(f\"{column_character}:{column_character}\", column_width)\n", + " worksheet.set_column(\n", + " f\"{column_character}:{column_character}\", column_width\n", + " )\n", "\n", " # Special formatting for all percent columns\n", " # Note: we can't just search for `percent`, because that's included in the word `percentile`.\n", @@ -866,9 +907,7 @@ "\n", " # Special formatting for columns that capture the percent of population considered priority.\n", " if \"(percent population)\" in column:\n", - " column_ranges = (\n", - " f\"{column_character}2:{column_character}{len(state_distribution_df)+1}\"\n", - " )\n", + " column_ranges = f\"{column_character}2:{column_character}{len(state_distribution_df)+1}\"\n", "\n", " # Add green to red conditional formatting.\n", " worksheet.conditional_format(\n", @@ -894,14 +933,18 @@ " writer.save()\n", "\n", "\n", - "fields_to_analyze = [index.priority_communities_field for index in census_tract_indices]\n", + "fields_to_analyze = [\n", + " index.priority_communities_field for index in census_tract_indices\n", + "]\n", "\n", "# Convert all indices to boolean\n", "for field_to_analyze in fields_to_analyze:\n", " if \"Areas of Concern\" in field_to_analyze:\n", " print(f\"Converting {field_to_analyze} to boolean.\")\n", "\n", - " merged_df[field_to_analyze] = merged_df[field_to_analyze].fillna(value=0)\n", + " merged_df[field_to_analyze] = merged_df[field_to_analyze].fillna(\n", + " value=0\n", + " )\n", " merged_df[field_to_analyze] = merged_df[field_to_analyze].astype(bool)\n", "\n", "\n", @@ -968,10 +1011,14 @@ " column_character = get_excel_column_name(column_index)\n", "\n", " # Set all columns to larger width\n", - " worksheet.set_column(f\"{column_character}:{column_character}\", column_width)\n", + " worksheet.set_column(\n", + " f\"{column_character}:{column_character}\", column_width\n", + " )\n", "\n", " # Add green to red conditional formatting.\n", - " column_ranges = f\"{column_character}2:{column_character}{len(basic_stats_df)+1}\"\n", + " column_ranges = (\n", + " f\"{column_character}2:{column_character}{len(basic_stats_df)+1}\"\n", + " )\n", " worksheet.conditional_format(\n", " column_ranges,\n", " # Min: green, max: red.\n", @@ -984,7 +1031,11 @@ "\n", " # Special formatting for all percent columns\n", " # Note: we can't just search for `percent`, because that's included in the word `percentile`.\n", - " if \"percent \" in column or \"(percent)\" in column or \"Percent \" in column:\n", + " if (\n", + " \"percent \" in column\n", + " or \"(percent)\" in column\n", + " or \"Percent \" in column\n", + " ):\n", " # Make these columns percentages.\n", " percentage_format = workbook.add_format({\"num_format\": \"0%\"})\n", " worksheet.set_column(\n", @@ -1013,9 +1064,15 @@ " temp_df[index.priority_communities_field] == True\n", " )\n", "\n", - " grouped_df = temp_df.groupby(index.priority_communities_field).mean().reset_index()\n", - " result_df = grouped_df[[index.priority_communities_field] + comparison_fields]\n", - " result_df.to_csv(directory / f\"{index.method_name} Basic Stats.csv\", index=False)\n", + " grouped_df = (\n", + " temp_df.groupby(index.priority_communities_field).mean().reset_index()\n", + " )\n", + " result_df = grouped_df[\n", + " [index.priority_communities_field] + comparison_fields\n", + " ]\n", + " result_df.to_csv(\n", + " directory / f\"{index.method_name} Basic Stats.csv\", index=False\n", + " )\n", " write_basic_stats_excel(\n", " basic_stats_df=result_df,\n", " file_path=directory / f\"{index.method_name} Basic Stats.xlsx\",\n", @@ -1064,7 +1121,9 @@ "\n", " # Also add in the count of census tracts.\n", " count_field_name = \"Count of census tracts\"\n", - " comparison_df[count_field_name] = grouped_df.size().to_frame(count_field_name)\n", + " comparison_df[count_field_name] = grouped_df.size().to_frame(\n", + " count_field_name\n", + " )\n", "\n", " comparison_df = comparison_df.reset_index()\n", "\n", @@ -1079,7 +1138,9 @@ "\n", " # Put criteria description column first.\n", " columns_to_put_first = (\n", - " [criteria_description_field_name] + fields_to_group_by + [count_field_name]\n", + " [criteria_description_field_name]\n", + " + fields_to_group_by\n", + " + [count_field_name]\n", " )\n", " new_column_order = columns_to_put_first + [\n", " col for col in comparison_df.columns if col not in columns_to_put_first\n", @@ -1110,7 +1171,9 @@ "\n", " # Convert the dataframe to an XlsxWriter Excel object. We also turn off the\n", " # index column at the left of the output dataframe.\n", - " census_tracts_score_comparison_df.to_excel(writer, sheet_name=\"Sheet1\", index=False)\n", + " census_tracts_score_comparison_df.to_excel(\n", + " writer, sheet_name=\"Sheet1\", index=False\n", + " )\n", "\n", " # Get the xlsxwriter workbook and worksheet objects.\n", " workbook = writer.book\n", @@ -1132,7 +1195,9 @@ " column_character = get_excel_column_name(column_index)\n", "\n", " # Set all columns to larger width\n", - " worksheet.set_column(f\"{column_character}:{column_character}\", column_width)\n", + " worksheet.set_column(\n", + " f\"{column_character}:{column_character}\", column_width\n", + " )\n", "\n", " # Add green to red conditional formatting.\n", " column_ranges = f\"{column_character}2:{column_character}{len(census_tracts_score_comparison_df)+1}\"\n", @@ -1148,7 +1213,11 @@ "\n", " # Special formatting for all percent columns\n", " # Note: we can't just search for `percent`, because that's included in the word `percentile`.\n", - " if \"percent \" in column or \"(percent)\" in column or \"Percent \" in column:\n", + " if (\n", + " \"percent \" in column\n", + " or \"(percent)\" in column\n", + " or \"Percent \" in column\n", + " ):\n", " # Make these columns percentages.\n", " percentage_format = workbook.add_format({\"num_format\": \"0%\"})\n", " worksheet.set_column(\n", @@ -1164,7 +1233,9 @@ " # Overwrite both the value and the format of each header cell\n", " # This is because xlsxwriter / pandas has a known bug where it can't wrap text for a dataframe.\n", " # See https://stackoverflow.com/questions/42562977/xlsxwriter-text-wrap-not-working.\n", - " for col_num, value in enumerate(census_tracts_score_comparison_df.columns.values):\n", + " for col_num, value in enumerate(\n", + " census_tracts_score_comparison_df.columns.values\n", + " ):\n", " worksheet.write(0, col_num, value, header_format)\n", "\n", " writer.save()\n", @@ -1422,7 +1493,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.10" + "version": "3.9.6" } }, "nbformat": 4, diff --git a/data/data-pipeline/data_pipeline/tests/sources/example/etl.py b/data/data-pipeline/data_pipeline/tests/sources/example/etl.py index 8254c926..1f7c5dc5 100644 --- a/data/data-pipeline/data_pipeline/tests/sources/example/etl.py +++ b/data/data-pipeline/data_pipeline/tests/sources/example/etl.py @@ -41,12 +41,12 @@ class ExampleETL(ExtractTransformLoad): logger.info(f"Extracting {zip_file_path}") with zipfile.ZipFile(zip_file_path, "r") as zip_ref: - zip_ref.extractall(self.TMP_PATH) + zip_ref.extractall(self.get_tmp_path()) def transform(self): - logger.info(f"Loading file from {self.TMP_PATH / 'input.csv'}.") + logger.info(f"Loading file from {self.get_tmp_path() / 'input.csv'}.") df: pd.DataFrame = pd.read_csv( - self.TMP_PATH / "input.csv", + self.get_tmp_path() / "input.csv", dtype={self.GEOID_TRACT_FIELD_NAME: "string"}, low_memory=False, ) diff --git a/data/data-pipeline/data_pipeline/tests/sources/example/test_etl.py b/data/data-pipeline/data_pipeline/tests/sources/example/test_etl.py index 3a124f2b..ba47b167 100644 --- a/data/data-pipeline/data_pipeline/tests/sources/example/test_etl.py +++ b/data/data-pipeline/data_pipeline/tests/sources/example/test_etl.py @@ -543,7 +543,7 @@ class TestETL: f"Writing data to {self._DATA_DIRECTORY_FOR_TEST / self._INPUT_CSV_FILE_NAME}" ) copy_data_files( - src=etl.TMP_PATH / "input.csv", + src=etl.get_tmp_path() / "input.csv", dst=self._DATA_DIRECTORY_FOR_TEST / self._INPUT_CSV_FILE_NAME, ) diff --git a/data/data-pipeline/data_pipeline/tests/sources/national_risk_index/test_etl.py b/data/data-pipeline/data_pipeline/tests/sources/national_risk_index/test_etl.py index 7b6d3bc9..8f031427 100644 --- a/data/data-pipeline/data_pipeline/tests/sources/national_risk_index/test_etl.py +++ b/data/data-pipeline/data_pipeline/tests/sources/national_risk_index/test_etl.py @@ -98,7 +98,9 @@ class TestNationalRiskIndexETL(TestETL): # setup etl = NationalRiskIndexETL() data_path, tmp_path = mock_paths - input_csv = tmp_path / "NRI_Table_CensusTracts.csv" + input_csv = ( + tmp_path / "NationalRiskIndexETL" / "NRI_Table_CensusTracts.csv" + ) # validation assert etl.INPUT_CSV == input_csv @@ -141,7 +143,9 @@ class TestNationalRiskIndexETL(TestETL): ) # Assert that the extracted file exists - extracted_file_path = tmp_path / "NRI_Table_CensusTracts.csv" + extracted_file_path = ( + tmp_path / "NationalRiskIndexETL" / "NRI_Table_CensusTracts.csv" + ) assert extracted_file_path.is_file() input_csv_path = ( diff --git a/data/data-pipeline/data_pipeline/tests/test_etl.py b/data/data-pipeline/data_pipeline/tests/test_etl.py index 0698ee50..8af3e91c 100644 --- a/data/data-pipeline/data_pipeline/tests/test_etl.py +++ b/data/data-pipeline/data_pipeline/tests/test_etl.py @@ -1,9 +1,10 @@ +# pylint: disable=protected-access import pytest from data_pipeline.etl import constants, runner def test_get_datasets_to_run(): - assert runner.get_datasets_to_run(None) == constants.DATASET_LIST - assert runner.get_datasets_to_run("census") == [constants.CENSUS_INFO] + assert runner._get_datasets_to_run(None) == constants.DATASET_LIST + assert runner._get_datasets_to_run("census") == [constants.CENSUS_INFO] with pytest.raises(ValueError): - runner.get_datasets_to_run("doesnt_exist") + runner._get_datasets_to_run("doesnt_exist")