j40-cejst-2/data/data-pipeline/data_pipeline/etl/base.py
Lucas Merrill Brown 1d101c93d2
Issue 844: Add island areas to Definition L (#957)
This ended up being a pretty large task. Here's what this PR does:

1. Pulls in Vincent's data from island areas into the score ETL. This is from the 2010 decennial census, the last census of any kind in the island areas.
2. Grabs a few new fields from 2010 island areas decennial census.
3. Calculates area median income for island areas.
4. Stops using EJSCREEN as the source of our high school education data and directly pulls that from census (this was related to this project so I went ahead and fixed it).
5. Grabs a bunch of data from the 2010 ACS in the states/Puerto Rico/DC, so that we can create percentiles comparing apples-to-apples (ish) from 2010 island areas decennial census data to 2010 ACS data. This required creating a new class because all the ACS fields are different between 2010 and 2019, so it wasn't as simple as looping over a year parameter.
6. Creates a combined population field of island areas and mainland so we can use those stats in our comparison tool, and updates the comparison tool accordingly.
2021-12-03 15:46:10 -05:00

178 lines
6.6 KiB
Python

from pathlib import Path
from typing import Optional
import pandas as pd
import yaml
from data_pipeline.config import settings
from data_pipeline.utils import (
unzip_file_from_url,
remove_all_from_dir,
get_module_logger,
)
logger = get_module_logger(__name__)
class ExtractTransformLoad:
"""
A class used to instantiate an ETL object to retrieve and process data from
datasets.
Attributes:
DATA_PATH (pathlib.Path): Local path where all data will be stored
TMP_PATH (pathlib.Path): Local path where temporary data will be stored
GEOID_FIELD_NAME (str): The common column name for a Census Block Group identifier
GEOID_TRACT_FIELD_NAME (str): The common column name for a Census Tract identifier
"""
APP_ROOT: Path = settings.APP_ROOT
DATA_PATH: Path = APP_ROOT / "data"
TMP_PATH: Path = DATA_PATH / "tmp"
FILES_PATH: Path = settings.APP_ROOT / "files"
GEOID_FIELD_NAME: str = "GEOID10"
GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT"
# TODO: investigate. Census says there are only 217,740 CBGs in the US. This might
# be from CBGs at different time periods.
EXPECTED_MAX_CENSUS_BLOCK_GROUPS: int = 250000
# TODO: investigate. Census says there are only 74,134 tracts in the US,
# Puerto Rico, and island areas. This might be from tracts at different time
# periods. https://github.com/usds/justice40-tool/issues/964
EXPECTED_MAX_CENSUS_TRACTS: int = 74160
def __init__(self, config_path: Path) -> None:
"""Inits the class with instance specific variables"""
# set by _get_yaml_config()
self.NAME: str = None
self.SOURCE_URL: str = None
self.GEOID_COL: str = None
self.GEO_LEVEL: str = None
self.SCORE_COLS: list = None
self.FIPS_CODES: pd.DataFrame = None
self.OUTPUT_PATH: Path = None
self.CENSUS_CSV: Path = None
self._get_yaml_config(config_path)
def _get_yaml_config(self, config_path: Path) -> None:
"""Reads the YAML configuration file for the dataset and stores
the properies in the instance (upcoming feature)"""
# parse the yaml config file
try:
with open(config_path, "r", encoding="utf-8") as file:
config = yaml.safe_load(file)
except (FileNotFoundError, yaml.YAMLError) as err:
raise err
# set dataset specific attributes
census_dir = self.DATA_PATH / "census" / "csv"
if config["is_census"]:
csv_dir = census_dir
else:
self.CENSUS_CSV = census_dir / "us.csv"
self.FIPS_CODES = self._get_census_fips_codes()
csv_dir = self.DATA_PATH / "dataset"
# parse name and set output path
name = config.get("name")
snake_name = name.replace(" ", "_").lower() # converts to snake case
output_dir = snake_name + (config.get("year") or "")
self.OUTPUT_PATH = csv_dir / output_dir / "usa.csv"
self.OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
# set class attributes
attrs = ["NAME", "SOURCE_URL", "GEOID_COL", "GEO_LEVEL", "SCORE_COLS"]
for attr in attrs:
setattr(self, attr, config[attr.lower()])
def check_ttl(self) -> None:
"""Checks if the ETL process can be run based on a the TLL value on the
YAML config (upcoming feature)"""
pass
def extract(
self,
source_url: str = None,
extract_path: Path = None,
verify: Optional[bool] = True,
) -> None:
"""Extract the data from a remote source. By default it provides code
to get the file from a source url, unzips it and stores it on an
extract_path."""
# this can be accessed via super().extract()
if source_url and extract_path:
unzip_file_from_url(
source_url, self.TMP_PATH, extract_path, verify=verify
)
def transform(self) -> None:
"""Transform the data extracted into a format that can be consumed by the
score generator"""
raise NotImplementedError
def load(self) -> None:
"""Saves the transformed data in the specified local data folder or remote AWS S3
bucket"""
raise NotImplementedError
def cleanup(self) -> None:
"""Clears out any files stored in the TMP folder"""
remove_all_from_dir(self.TMP_PATH)
# TODO: Add test for this
def _get_census_fips_codes(self) -> pd.DataFrame:
"""Loads FIPS codes for each Census block group and tract"""
# check that the census data exists
if not self.CENSUS_CSV.exists():
logger.info("Census data not found, please run download_csv first")
# load the census data
df = pd.read_csv(
self.CENSUS_CSV, dtype={self.GEOID_FIELD_NAME: "string"}
)
# extract Census tract FIPS code from Census block group
df[self.GEOID_TRACT_FIELD_NAME] = df[self.GEOID_FIELD_NAME].str[0:11]
return df[[self.GEOID_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME]]
# TODO: Create tests
def validate_output(self) -> None:
"""Checks that the output of the ETL process adheres to the contract
expected by the score module
Contract conditions:
- Output is saved as usa.csv at the path specified by self.OUTPUT_PATH
- The output csv has a column named GEOID10 which stores each of the
Census block group FIPS codes in data/census/csv/usa.csv
- The output csv has a column named GEOID10_TRACT which stores each of
Census tract FIPS codes associated with each Census block group
- The output csv has each of the columns expected by the score and the
name and dtype of those columns match the format expected by score
"""
# read in output file
# and check that GEOID cols are present
assert self.OUTPUT_PATH.exists(), f"No file found at {self.OUTPUT_PATH}"
df_output = pd.read_csv(
self.OUTPUT_PATH,
dtype={
self.GEOID_FIELD_NAME: "string",
self.GEOID_TRACT_FIELD_NAME: "string",
},
)
# check that the GEOID cols in the output match census data
geoid_cols = [self.GEOID_FIELD_NAME, self.GEOID_TRACT_FIELD_NAME]
for col in geoid_cols:
assert col in self.FIPS_CODES.columns
assert self.FIPS_CODES.equals(df_output[geoid_cols])
# check that the score columns are in the output
for col in self.SCORE_COLS:
assert col in df_output.columns, f"{col} is missing from output"