Adding ETL script

This commit is contained in:
VincentLaUSDS 2021-09-22 14:23:20 -04:00
parent c3fb80fbdc
commit 0d7a9161a8

View file

@ -8,31 +8,34 @@ logger = get_module_logger(__name__)
class EJScreenAreasOfConcernETL(ExtractTransformLoad): class EJScreenAreasOfConcernETL(ExtractTransformLoad):
def __init__(self): def __init__(self):
# self.EJSCREEN_FTP_URL = "https://gaftp.epa.gov/EJSCREEN/2019/EJSCREEN_2019_StatePctile.csv.zip" self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "ejscreen_areas_of_concern"
# self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_2019_StatePctiles.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019" # TO DO: Load from actual source; the issue is that this dataset is not public for now
self.LOCAL_CSV_PATH = self.DATA_PATH / "local"
self.GEOID_CBG_FIELD_NAME = "GEOID10_CBG"
self.df: pd.DataFrame self.df: pd.DataFrame
def extract(self) -> None: def extract(self) -> None:
logger.info("Downloading EJScreen Data") logger.info("Loading EJScreen Areas of Concern Data Locally")
super().extract(
self.EJSCREEN_FTP_URL,
self.TMP_PATH,
verify=False, # EPA EJScreen end point has certificate issues often
)
def transform(self) -> None:
logger.info("Transforming EJScreen Data")
self.df = pd.read_csv( self.df = pd.read_csv(
self.EJSCREEN_CSV, filepath_or_buffer=self.LOCAL_CSV_PATH
dtype={"ID": "string"}, / "ejscreen_areas_of_concerns_indicators.csv",
# EJSCREEN writes the word "None" for NA data. dtype={
na_values=["None"], self.GEOID_CBG_FIELD_NAME: "string",
},
low_memory=False, low_memory=False,
) )
def transform(self) -> None:
logger.info("Transforming EJScreen Areas of Concern Data")
# TO DO: As a one off we did all the processing in a separate Notebook
# Can add here later for a future PR
pass
def load(self) -> None: def load(self) -> None:
logger.info("Saving EJScreen CSV") logger.info("Saving EJScreen Areas of Concern Data")
# write nationwide csv # write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True) self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.CSV_PATH / "usa.csv", index=False) self.df.to_csv(self.OUTPUT_PATH / "usa.csv", index=False)