mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-02-23 10:04:18 -08:00
* Add spatial join method (#1871) Since we'll need to figure out the tracts for a large number of points in future tickets, add a utility to handle grabbing the tract geometries and adding tract data to a point dataset. * Add FUDS, also jupyter lab (#1871) * Add YAML configs for FUDS (#1871) * Allow input geoid to be optional (#1871) * Add FUDS ETL, tests, test-datae noteobook (#1871) This adds the ETL class for Formerly Used Defense Sites (FUDS). This is different from most other ETLs since these FUDS are not provided by tract, but instead by geographic point, so we need to assign FUDS to tracts and then do calculations from there. * Floats -> Ints, as I intended (#1871) * Floats -> Ints, as I intended (#1871) * Formatting fixes (#1871) * Add test false positive GEOIDs (#1871) * Add gdal binaries (#1871) * Refactor pandas code to be more idiomatic (#1871) Per Emma, the more pandas-y way of doing my counts is using np.where to add the values i need, then groupby and size. It is definitely more compact, and also I think more correct! * Update configs per Emma suggestions (#1871) * Type fixed! (#1871) * Remove spurious import from vscode (#1871) * Snapshot update after changing col name (#1871) * Move up GDAL (#1871) * Adjust geojson strategy (#1871) * Try running census separately first (#1871) * Fix import order (#1871) * Cleanup cache strategy (#1871) * Download census data from S3 instead of re-calculating (#1871) * Clarify pandas code per Emma (#1871)
344 lines
12 KiB
Python
344 lines
12 KiB
Python
import enum
|
|
import pathlib
|
|
import sys
|
|
import typing
|
|
from typing import Optional
|
|
|
|
import pandas as pd
|
|
|
|
from data_pipeline.config import settings
|
|
from data_pipeline.etl.score.schemas.datasets import DatasetsConfig
|
|
from data_pipeline.utils import (
|
|
load_yaml_dict_from_file,
|
|
unzip_file_from_url,
|
|
remove_all_from_dir,
|
|
get_module_logger,
|
|
)
|
|
|
|
logger = get_module_logger(__name__)
|
|
|
|
|
|
class ValidGeoLevel(enum.Enum):
|
|
"""Enum used for indicating output data's geographic resolution."""
|
|
|
|
CENSUS_TRACT = enum.auto()
|
|
CENSUS_BLOCK_GROUP = enum.auto()
|
|
|
|
|
|
class ExtractTransformLoad:
|
|
"""
|
|
A class used to instantiate an ETL object to retrieve and process data from
|
|
datasets.
|
|
|
|
Attributes:
|
|
DATA_PATH (pathlib.Path): Local path where all data will be stored
|
|
TMP_PATH (pathlib.Path): Local path where temporary data will be stored
|
|
|
|
TODO: Fill missing attrs here
|
|
|
|
GEOID_FIELD_NAME (str): The common column name for a Census Block Group identifier
|
|
GEOID_TRACT_FIELD_NAME (str): The common column name for a Census Tract identifier
|
|
"""
|
|
|
|
APP_ROOT: pathlib.Path = settings.APP_ROOT
|
|
|
|
# Directories
|
|
DATA_PATH: pathlib.Path = APP_ROOT / "data"
|
|
TMP_PATH: pathlib.Path = DATA_PATH / "tmp"
|
|
CONTENT_CONFIG: pathlib.Path = APP_ROOT / "content" / "config"
|
|
DATASET_CONFIG_PATH: pathlib.Path = APP_ROOT / "etl" / "score" / "config"
|
|
DATASET_CONFIG: Optional[dict] = None
|
|
|
|
# Parameters
|
|
GEOID_FIELD_NAME: str = "GEOID10"
|
|
GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT"
|
|
|
|
# Parameters that will be changed by children of the class
|
|
# NAME is used to create output path and populate logger info.
|
|
NAME: str = None
|
|
|
|
# LAST_UPDATED_YEAR is used to create output path.
|
|
LAST_UPDATED_YEAR: int = None
|
|
|
|
# SOURCE_URL is used to extract source data in extract().
|
|
SOURCE_URL: str = None
|
|
|
|
# INPUT_EXTRACTED_FILE_NAME is the name of the file after extract().
|
|
INPUT_EXTRACTED_FILE_NAME: str = None
|
|
|
|
# GEO_LEVEL is used to identify whether output data is at the unit of the tract or
|
|
# census block group.
|
|
# TODO: add tests that enforce seeing the expected geographic identifier field
|
|
# in the output file based on this geography level.
|
|
GEO_LEVEL: ValidGeoLevel = None
|
|
|
|
# COLUMNS_TO_KEEP is used to identify which columns to keep in the output df.
|
|
COLUMNS_TO_KEEP: typing.List[str] = None
|
|
|
|
# INPUT_GEOID_TRACT_FIELD_NAME is the field name that identifies the Census Tract ID
|
|
# on the input file
|
|
INPUT_GEOID_TRACT_FIELD_NAME: str = None
|
|
|
|
# NULL_REPRESENTATION is how nulls are represented on the input field
|
|
NULL_REPRESENTATION: str = None
|
|
|
|
# Thirteen digits in a census block group ID.
|
|
EXPECTED_CENSUS_BLOCK_GROUPS_CHARACTER_LENGTH: int = 13
|
|
# TODO: investigate. Census says there are only 217,740 CBGs in the US. This might
|
|
# be from CBGs at different time periods.
|
|
EXPECTED_MAX_CENSUS_BLOCK_GROUPS: int = 250000
|
|
|
|
# There should be Eleven digits in a census tract ID.
|
|
EXPECTED_CENSUS_TRACTS_CHARACTER_LENGTH: int = 11
|
|
# TODO: investigate. Census says there are only 74,134 tracts in the United States,
|
|
# Puerto Rico, and island areas. This might be from tracts at different time
|
|
# periods. https://github.com/usds/justice40-tool/issues/964
|
|
EXPECTED_MAX_CENSUS_TRACTS: int = 74160
|
|
|
|
# We use output_df as the final dataframe to use to write to the CSV
|
|
# It is used on the "load" base class method
|
|
output_df: pd.DataFrame = None
|
|
|
|
def __init_subclass__(cls) -> None:
|
|
cls.DATASET_CONFIG = cls.yaml_config_load()
|
|
|
|
@classmethod
|
|
def yaml_config_load(cls) -> Optional[dict]:
|
|
"""Generate config dictionary and set instance variables from YAML dataset."""
|
|
if cls.NAME is not None:
|
|
# check if the class instance has score YAML definitions
|
|
datasets_config = load_yaml_dict_from_file(
|
|
cls.DATASET_CONFIG_PATH / "datasets.yml",
|
|
DatasetsConfig,
|
|
)
|
|
|
|
# get the config for this dataset
|
|
try:
|
|
dataset_config = next(
|
|
item
|
|
for item in datasets_config.get("datasets")
|
|
if item["module_name"] == cls.NAME
|
|
)
|
|
except StopIteration:
|
|
# Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope.
|
|
logger.error(
|
|
f"Exception encountered while extracting dataset config for dataset {cls.NAME}"
|
|
)
|
|
sys.exit()
|
|
|
|
# set some of the basic fields
|
|
if "input_geoid_tract_field_name" in dataset_config:
|
|
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
|
|
"input_geoid_tract_field_name"
|
|
]
|
|
|
|
# get the columns to write on the CSV
|
|
# and set the constants
|
|
cls.COLUMNS_TO_KEEP = [
|
|
cls.GEOID_TRACT_FIELD_NAME, # always index with geoid tract id
|
|
]
|
|
for field in dataset_config["load_fields"]:
|
|
cls.COLUMNS_TO_KEEP.append(field["long_name"])
|
|
setattr(cls, field["df_field_name"], field["long_name"])
|
|
|
|
# set the constants for the class
|
|
setattr(cls, field["df_field_name"], field["long_name"])
|
|
return dataset_config
|
|
return None
|
|
|
|
# This is a classmethod so it can be used by `get_data_frame` without
|
|
# needing to create an instance of the class. This is a use case in `etl_score`.
|
|
@classmethod
|
|
def _get_output_file_path(cls) -> pathlib.Path:
|
|
"""Generate the output file path."""
|
|
if cls.NAME is None:
|
|
raise NotImplementedError(
|
|
f"Child ETL class needs to specify `cls.NAME` (currently "
|
|
f"{cls.NAME})."
|
|
)
|
|
|
|
output_file_path = cls.DATA_PATH / "dataset" / f"{cls.NAME}" / "usa.csv"
|
|
return output_file_path
|
|
|
|
def get_tmp_path(self) -> pathlib.Path:
|
|
"""Returns the temporary path associated with this ETL class."""
|
|
# Note: the temporary path will be defined on `init`, because it uses the class
|
|
# of the instance which is often a child class.
|
|
tmp_path = self.DATA_PATH / "tmp" / str(self.__class__.__name__)
|
|
|
|
# Create directory if it doesn't exist
|
|
tmp_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
return tmp_path
|
|
|
|
def extract(
|
|
self,
|
|
source_url: str = None,
|
|
extract_path: pathlib.Path = None,
|
|
verify: Optional[bool] = True,
|
|
) -> None:
|
|
"""Extract the data from a remote source. By default it provides code
|
|
to get the file from a source url, unzips it and stores it on an
|
|
extract_path."""
|
|
|
|
if source_url is None:
|
|
source_url = self.SOURCE_URL
|
|
|
|
if extract_path is None:
|
|
extract_path = self.get_tmp_path()
|
|
|
|
unzip_file_from_url(
|
|
file_url=source_url,
|
|
download_path=self.get_tmp_path(),
|
|
unzipped_file_path=extract_path,
|
|
verify=verify,
|
|
)
|
|
|
|
def transform(self) -> None:
|
|
"""Transform the data extracted into a format that can be consumed by the
|
|
score generator"""
|
|
|
|
raise NotImplementedError
|
|
|
|
def validate(self) -> None:
|
|
"""Validates the output.
|
|
|
|
Runs after the `transform` step and before `load`.
|
|
"""
|
|
# TODO: remove this once all ETL classes are converted to using the new
|
|
# base class parameters and patterns.
|
|
if self.GEO_LEVEL is None:
|
|
logger.info(
|
|
"Skipping validation step for this class because it does not "
|
|
"seem to be converted to new ETL class patterns."
|
|
)
|
|
return
|
|
|
|
if self.COLUMNS_TO_KEEP is None:
|
|
raise NotImplementedError(
|
|
"`self.COLUMNS_TO_KEEP` must be specified."
|
|
)
|
|
|
|
if self.output_df is None:
|
|
raise NotImplementedError(
|
|
"The `transform` step must set `self.output_df`."
|
|
)
|
|
|
|
for column_to_keep in self.COLUMNS_TO_KEEP:
|
|
if column_to_keep not in self.output_df.columns:
|
|
raise ValueError(
|
|
f"Missing column: `{column_to_keep}` is missing from "
|
|
f"output"
|
|
)
|
|
|
|
for (
|
|
geo_level,
|
|
geo_field,
|
|
expected_geo_field_characters,
|
|
expected_rows,
|
|
) in [
|
|
(
|
|
ValidGeoLevel.CENSUS_TRACT,
|
|
self.GEOID_TRACT_FIELD_NAME,
|
|
self.EXPECTED_CENSUS_TRACTS_CHARACTER_LENGTH,
|
|
self.EXPECTED_MAX_CENSUS_TRACTS,
|
|
),
|
|
(
|
|
ValidGeoLevel.CENSUS_BLOCK_GROUP,
|
|
self.GEOID_FIELD_NAME,
|
|
self.EXPECTED_CENSUS_BLOCK_GROUPS_CHARACTER_LENGTH,
|
|
self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS,
|
|
),
|
|
]:
|
|
if self.GEO_LEVEL is geo_level:
|
|
if geo_field not in self.COLUMNS_TO_KEEP:
|
|
raise ValueError(
|
|
f"Must have `{geo_field}` in columns if "
|
|
f"specifying geo level as `{geo_level} "
|
|
)
|
|
if self.output_df.shape[0] > expected_rows:
|
|
raise ValueError(
|
|
f"Too many rows: `{self.output_df.shape[0]}` rows in "
|
|
f"output exceeds expectation of `{expected_rows}` "
|
|
f"rows."
|
|
)
|
|
|
|
if self.output_df[geo_field].str.len().nunique() > 1:
|
|
raise ValueError(
|
|
f"Multiple character lengths for geo field "
|
|
f"present: {self.output_df[geo_field].str.len().unique()}."
|
|
)
|
|
|
|
elif (
|
|
len(self.output_df[geo_field].array[0])
|
|
!= expected_geo_field_characters
|
|
):
|
|
raise ValueError(
|
|
"Wrong character length: the census geography data "
|
|
"has the wrong length."
|
|
)
|
|
|
|
duplicate_geo_field_values = (
|
|
self.output_df[geo_field].shape[0]
|
|
- self.output_df[geo_field].nunique()
|
|
)
|
|
if duplicate_geo_field_values > 0:
|
|
raise ValueError(
|
|
f"Duplicate values: There are {duplicate_geo_field_values} "
|
|
f"duplicate values in "
|
|
f"`{geo_field}`."
|
|
)
|
|
|
|
def load(self, float_format=None) -> None:
|
|
"""Saves the transformed data.
|
|
|
|
Data is written in the specified local data folder or remote AWS S3 bucket.
|
|
|
|
Uses the directory and the file name from `self._get_output_file_path`.
|
|
"""
|
|
logger.info(f"Saving `{self.NAME}` CSV")
|
|
|
|
# Create directory if necessary.
|
|
output_file_path = self._get_output_file_path()
|
|
output_file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Write nationwide csv
|
|
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
|
|
output_file_path, index=False, float_format=float_format
|
|
)
|
|
|
|
logger.info(f"File written to `{output_file_path}`.")
|
|
|
|
# This is a classmethod so it can be used without needing to create an instance of
|
|
# the class. This is a use case in `etl_score`.
|
|
@classmethod
|
|
def get_data_frame(cls) -> pd.DataFrame:
|
|
"""Return the output data frame for this class.
|
|
|
|
Must be run after a full ETL process has been run for this class.
|
|
|
|
If the ETL has been not run for this class, this will error.
|
|
"""
|
|
# Read in output file
|
|
output_file_path = cls._get_output_file_path()
|
|
if not output_file_path.exists():
|
|
raise ValueError(
|
|
f"Make sure to run ETL process first for `{cls}`. "
|
|
f"No file found at `{output_file_path}`."
|
|
)
|
|
|
|
output_df = pd.read_csv(
|
|
output_file_path,
|
|
dtype={
|
|
# Not all outputs will have both a Census Block Group ID and a
|
|
# Tract ID, but these will be ignored if they're not present.
|
|
cls.GEOID_FIELD_NAME: "string",
|
|
cls.GEOID_TRACT_FIELD_NAME: "string",
|
|
},
|
|
)
|
|
|
|
return output_df
|
|
|
|
def cleanup(self) -> None:
|
|
"""Clears out any files stored in the TMP folder"""
|
|
remove_all_from_dir(self.get_tmp_path())
|