checkpoint

This commit is contained in:
Jorge Escobar 2022-04-07 18:55:44 -04:00
parent d3a54e4820
commit 0e1e15eeaa
3 changed files with 41 additions and 4 deletions

View file

@ -1,12 +1,15 @@
import enum import enum
import pathlib import pathlib
import sys
import typing import typing
from typing import Optional from typing import Optional
import pandas as pd import pandas as pd
from data_pipeline.config import settings from data_pipeline.config import settings
from data_pipeline.etl.score.schemas.datasets import DatasetsConfig
from data_pipeline.utils import ( from data_pipeline.utils import (
load_yaml_dict_from_file,
unzip_file_from_url, unzip_file_from_url,
remove_all_from_dir, remove_all_from_dir,
get_module_logger, get_module_logger,
@ -79,6 +82,34 @@ class ExtractTransformLoad:
output_df: pd.DataFrame = None output_df: pd.DataFrame = None
def yaml_config_load(self):
# check if the class instance has score YAML definitions
datasets_config = load_yaml_dict_from_file(
self.APP_ROOT / "etl" / "score" / "config" / "datasets.yml",
DatasetsConfig,
)
# get the config for this dataset
try:
dataset_config = next(
item
for item in datasets_config.get("datasets")
if item["module_name"] == "self.NAME"
)
except StopIteration:
# Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope.
logger.error(
f"Exception encountered while extracting dataset config for dataset {self.NAME}"
)
sys.exit()
# set the fields
self.LAST_UPDATED_YEAR = dataset_config["last_updated_year"]
self.SOURCE_URL = dataset_config["source_url"]
self.INPUT_CSV = (
self.get_tmp_path() / dataset_config["extracted_file_name"]
)
# This is a classmethod so it can be used by `get_data_frame` without # This is a classmethod so it can be used by `get_data_frame` without
# needing to create an instance of the class. This is a use case in `etl_score`. # needing to create an instance of the class. This is a use case in `etl_score`.
@classmethod @classmethod

View file

@ -15,8 +15,6 @@ class NationalRiskIndexETL(ExtractTransformLoad):
"""ETL class for the FEMA National Risk Index dataset""" """ETL class for the FEMA National Risk Index dataset"""
NAME = "national_risk_index" NAME = "national_risk_index"
LAST_UPDATED_YEAR = 2020
SOURCE_URL = "https://hazards.fema.gov/nri/Content/StaticDocuments/DataDownload//NRI_Table_CensusTracts/NRI_Table_CensusTracts.zip"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
## TEMPORARILY HERE ## TEMPORARILY HERE
@ -27,7 +25,8 @@ class NationalRiskIndexETL(ExtractTransformLoad):
AGRIVALUE_LOWER_BOUND = 408000 AGRIVALUE_LOWER_BOUND = 408000
def __init__(self): def __init__(self):
self.INPUT_CSV = self.get_tmp_path() / "NRI_Table_CensusTracts.csv" # load YAML config
super().yaml_config_load()
self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = ( self.RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_INPUT_FIELD_NAME = (
"EAL_SCORE" "EAL_SCORE"

View file

@ -6,8 +6,10 @@ import os
import sys import sys
import shutil import shutil
import uuid import uuid
from xml.dom import ValidationErr
import zipfile import zipfile
from pathlib import Path from pathlib import Path
from marshmallow import ValidationError
import urllib3 import urllib3
import requests import requests
import yaml import yaml
@ -350,7 +352,12 @@ def load_yaml_dict_from_file(
# validate YAML # validate YAML
yaml_config_schema = class_schema(schema_class) yaml_config_schema = class_schema(schema_class)
yaml_config_schema().load(yaml_dict)
try:
yaml_config_schema().load(yaml_dict)
except ValidationError as e:
logger.error(f"Invalid YAML config file {yaml_file_path}")
logger.error(e.normalized_messages())
return yaml_dict return yaml_dict