Add tests for all non-census sources (#1899)

* Refactor CDC life-expectancy (1554)

* Update to new tract list (#1554)

* Adjust for tests (#1848)

* Add tests for cdc_places (#1848)

* Add EJScreen tests (#1848)

* Add tests for HUD housing (#1848)

* Add tests for GeoCorr (#1848)

* Add persistent poverty tests (#1848)

* Update for sources without zips, for new validation (#1848)

* Update tests for new multi-CSV but (#1848)

Lucas updated the CDC life expectancy data to handle a bug where two
states are missing from the US Overall download. Since virtually none of
our other ETL classes download multiple CSVs directly like this, it
required a pretty invasive new mocking strategy.

* Add basic tests for nature deprived (#1848)

* Add wildfire tests (#1848)

* Add flood risk tests (#1848)

* Add DOT travel tests (#1848)

* Add historic redlining tests (#1848)

* Add tests for ME and WI (#1848)

* Update now that validation exists (#1848)

* Adjust for validation (#1848)

* Add health insurance back to cdc places (#1848)

Ooops

* Update tests with new field (#1848)

* Test for blank tract removal (#1848)

* Add tracts for clipping behavior

* Test clipping and zfill behavior (#1848)

* Fix bad test assumption (#1848)

* Simplify class, add test for tract padding (#1848)

* Fix percentage inversion, update tests (#1848)

Looking through the transformations, I noticed that we were subtracting
a percentage that is usually between 0-100 from 1 instead of 100, and so
were endind up with some surprising results. Confirmed with lucasmbrown-usds

* Add note about first street data (#1848)
This commit is contained in:
Matt Bowen 2022-09-19 15:17:00 -04:00 committed by GitHub
commit 876655d2b2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
88 changed files with 2032 additions and 178 deletions

View file

@ -115,56 +115,59 @@ class ExtractTransformLoad:
# periods. https://github.com/usds/justice40-tool/issues/964
EXPECTED_MAX_CENSUS_TRACTS: int = 74160
# Should this dataset load its configuration from
# the YAML files?
LOAD_YAML_CONFIG: bool = False
# We use output_df as the final dataframe to use to write to the CSV
# It is used on the "load" base class method
output_df: pd.DataFrame = None
def __init_subclass__(cls) -> None:
cls.DATASET_CONFIG = cls.yaml_config_load()
if cls.LOAD_YAML_CONFIG:
cls.DATASET_CONFIG = cls.yaml_config_load()
@classmethod
def yaml_config_load(cls) -> Optional[dict]:
def yaml_config_load(cls) -> dict:
"""Generate config dictionary and set instance variables from YAML dataset."""
if cls.NAME is not None:
# check if the class instance has score YAML definitions
datasets_config = load_yaml_dict_from_file(
cls.DATASET_CONFIG_PATH / "datasets.yml",
DatasetsConfig,
# check if the class instance has score YAML definitions
datasets_config = load_yaml_dict_from_file(
cls.DATASET_CONFIG_PATH / "datasets.yml",
DatasetsConfig,
)
# get the config for this dataset
try:
dataset_config = next(
item
for item in datasets_config.get("datasets")
if item["module_name"] == cls.NAME
)
except StopIteration:
# Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope.
logger.error(
f"Exception encountered while extracting dataset config for dataset {cls.NAME}"
)
sys.exit()
# get the config for this dataset
try:
dataset_config = next(
item
for item in datasets_config.get("datasets")
if item["module_name"] == cls.NAME
)
except StopIteration:
# Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope.
logger.error(
f"Exception encountered while extracting dataset config for dataset {cls.NAME}"
)
sys.exit()
# set some of the basic fields
if "input_geoid_tract_field_name" in dataset_config:
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
"input_geoid_tract_field_name"
]
# get the columns to write on the CSV
# and set the constants
cls.COLUMNS_TO_KEEP = [
cls.GEOID_TRACT_FIELD_NAME, # always index with geoid tract id
# set some of the basic fields
if "input_geoid_tract_field_name" in dataset_config:
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
"input_geoid_tract_field_name"
]
for field in dataset_config["load_fields"]:
cls.COLUMNS_TO_KEEP.append(field["long_name"])
setattr(cls, field["df_field_name"], field["long_name"])
# set the constants for the class
setattr(cls, field["df_field_name"], field["long_name"])
return dataset_config
return None
# get the columns to write on the CSV
# and set the constants
cls.COLUMNS_TO_KEEP = [
cls.GEOID_TRACT_FIELD_NAME, # always index with geoid tract id
]
for field in dataset_config["load_fields"]:
cls.COLUMNS_TO_KEEP.append(field["long_name"])
setattr(cls, field["df_field_name"], field["long_name"])
# set the constants for the class
setattr(cls, field["df_field_name"], field["long_name"])
return dataset_config
# This is a classmethod so it can be used by `get_data_frame` without
# needing to create an instance of the class. This is a use case in `etl_score`.

View file

@ -289,4 +289,18 @@ datasets:
field_type: percentage
include_in_tiles: true
include_in_downloadable_files: true
create_percentile: true
create_percentile: true
- long_name: "CDC Life Expeectancy"
short_name: "cdc_life_expectancy"
module_name: "cdc_life_expectancy"
input_geoid_tract_field_name: "Tract ID"
load_fields:
- short_name: "LLEF"
df_field_name: "LIFE_EXPECTANCY_FIELD_NAME"
long_name: "Life expectancy (years)"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: false
create_reverse_percentile: true

View file

@ -16,7 +16,12 @@ class CDCLifeExpectancy(ExtractTransformLoad):
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
NAME = "cdc_life_expectancy"
USA_FILE_URL: str = "https://ftp.cdc.gov/pub/Health_Statistics/NCHS/Datasets/NVSS/USALEEP/CSV/US_A.CSV"
LOAD_YAML_CONFIG: bool = False
LIFE_EXPECTANCY_FIELD_NAME = "Life expectancy (years)"
INPUT_GEOID_TRACT_FIELD_NAME = "Tract ID"
STATES_MISSING_FROM_USA_FILE = ["23", "55"]
@ -69,8 +74,7 @@ class CDCLifeExpectancy(ExtractTransformLoad):
all_usa_raw_df = self._download_and_prep_data(
file_url=self.USA_FILE_URL,
download_file_name=self.get_tmp_path()
/ "cdc_life_expectancy"
/ "usa.csv",
/ "US_A.CSV",
)
# Check which states are missing
@ -91,7 +95,6 @@ class CDCLifeExpectancy(ExtractTransformLoad):
maine_raw_df = self._download_and_prep_data(
file_url=self.MAINE_FILE_URL,
download_file_name=self.get_tmp_path()
/ "cdc_life_expectancy"
/ "maine.csv",
)
@ -99,7 +102,6 @@ class CDCLifeExpectancy(ExtractTransformLoad):
wisconsin_raw_df = self._download_and_prep_data(
file_url=self.WISCONSIN_FILE_URL,
download_file_name=self.get_tmp_path()
/ "cdc_life_expectancy"
/ "wisconsin.csv",
)

View file

@ -1,6 +1,7 @@
import typing
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger, download_file_from_url
from data_pipeline.score import field_names
@ -8,13 +9,27 @@ logger = get_module_logger(__name__)
class CDCPlacesETL(ExtractTransformLoad):
NAME = "cdc_places"
GEO_LEVEL: ValidGeoLevel = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
CDC_GEOID_FIELD_NAME = "LocationID"
CDC_VALUE_FIELD_NAME = "Data_Value"
CDC_MEASURE_FIELD_NAME = "Measure"
def __init__(self):
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "cdc_places"
self.CDC_PLACES_URL = "https://chronicdata.cdc.gov/api/views/cwsq-ngmh/rows.csv?accessType=DOWNLOAD"
self.CDC_GEOID_FIELD_NAME = "LocationID"
self.CDC_VALUE_FIELD_NAME = "Data_Value"
self.CDC_MEASURE_FIELD_NAME = "Measure"
self.COLUMNS_TO_KEEP: typing.List[str] = [
self.GEOID_TRACT_FIELD_NAME,
field_names.DIABETES_FIELD,
field_names.ASTHMA_FIELD,
field_names.HEART_DISEASE_FIELD,
field_names.CANCER_FIELD,
field_names.HEALTH_INSURANCE_FIELD,
field_names.PHYS_HEALTH_NOT_GOOD_FIELD,
]
self.df: pd.DataFrame
@ -22,9 +37,7 @@ class CDCPlacesETL(ExtractTransformLoad):
logger.info("Starting to download 520MB CDC Places file.")
file_path = download_file_from_url(
file_url=self.CDC_PLACES_URL,
download_file_name=self.get_tmp_path()
/ "cdc_places"
/ "census_tract.csv",
download_file_name=self.get_tmp_path() / "census_tract.csv",
)
self.df = pd.read_csv(
@ -42,7 +55,6 @@ class CDCPlacesETL(ExtractTransformLoad):
inplace=True,
errors="raise",
)
# Note: Puerto Rico not included.
self.df = self.df.pivot(
index=self.GEOID_TRACT_FIELD_NAME,
@ -65,12 +77,4 @@ class CDCPlacesETL(ExtractTransformLoad):
)
# Make the index (the census tract ID) a column, not the index.
self.df.reset_index(inplace=True)
def load(self) -> None:
logger.info("Saving CDC Places Data")
# mkdir census
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)
self.output_df = self.df.reset_index()

View file

@ -25,6 +25,7 @@ class ChildOpportunityIndex(ExtractTransformLoad):
# Metadata for the baseclass
NAME = "child_opportunity_index"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
LOAD_YAML_CONFIG: bool = True
# Define these for easy code completion
EXTREME_HEAT_FIELD: str

View file

@ -15,6 +15,7 @@ class DOEEnergyBurden(ExtractTransformLoad):
+ "/DOE_LEAD_AMI_TRACT_2018_ALL.csv.zip"
)
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
LOAD_YAML_CONFIG: bool = True
REVISED_ENERGY_BURDEN_FIELD_NAME: str
@ -56,8 +57,3 @@ class DOEEnergyBurden(ExtractTransformLoad):
)
self.output_df = output_df
def load(self) -> None:
logger.info("Saving DOE Energy Burden CSV")
super().load()

View file

@ -17,6 +17,7 @@ class TravelCompositeETL(ExtractTransformLoad):
SOURCE_URL = "https://www.transportation.gov/sites/dot.gov/files/Shapefile_and_Metadata.zip"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
LOAD_YAML_CONFIG: bool = True
# Output score variables (values set on datasets.yml) for linting purposes
TRAVEL_BURDEN_FIELD_NAME: str

View file

@ -19,6 +19,7 @@ class AbandonedMineETL(ExtractTransformLoad):
NAME = "eamlis"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
AML_BOOLEAN: str
LOAD_YAML_CONFIG: bool = True
PUERTO_RICO_EXPECTED_IN_DATA = False
EXPECTED_MISSING_STATES = [

View file

@ -1,6 +1,6 @@
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.score import field_names
from data_pipeline.utils import get_module_logger
@ -10,6 +10,10 @@ logger = get_module_logger(__name__)
class EJSCREENETL(ExtractTransformLoad):
"""Load updated EJSCREEN data."""
NAME = "ejscreen"
GEO_LEVEL: ValidGeoLevel = ValidGeoLevel.CENSUS_TRACT
INPUT_GEOID_TRACT_FIELD_NAME: str = "ID"
def __init__(self):
self.EJSCREEN_FTP_URL = "https://gaftp.epa.gov/EJSCREEN/2021/EJSCREEN_2021_USPR_Tracts.csv.zip"
self.EJSCREEN_CSV = (
@ -52,16 +56,16 @@ class EJSCREENETL(ExtractTransformLoad):
logger.info("Transforming EJScreen Data")
self.df = pd.read_csv(
self.EJSCREEN_CSV,
dtype={"ID": str},
dtype={self.INPUT_GEOID_TRACT_FIELD_NAME: str},
# EJSCREEN writes the word "None" for NA data.
na_values=["None"],
low_memory=False,
)
# rename ID to Tract ID
self.df.rename(
self.output_df = self.df.rename(
columns={
"ID": self.GEOID_TRACT_FIELD_NAME,
self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME,
"ACSTOTPOP": field_names.TOTAL_POP_FIELD,
"CANCER": field_names.AIR_TOXICS_CANCER_RISK_FIELD,
"RESP": field_names.RESPIRATORY_HAZARD_FIELD,
@ -80,13 +84,4 @@ class EJSCREENETL(ExtractTransformLoad):
"PRE1960PCT": field_names.LEAD_PAINT_FIELD,
"UST": field_names.UST_FIELD, # added for 2021 update
},
inplace=True,
)
def load(self) -> None:
logger.info("Saving EJScreen CSV")
# write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df[self.COLUMNS_TO_KEEP].to_csv(
self.CSV_PATH / "usa.csv", index=False
)

View file

@ -14,8 +14,11 @@ class FloodRiskETL(ExtractTransformLoad):
"""ETL class for the First Street Foundation flood risk dataset"""
NAME = "fsf_flood_risk"
# These data were emailed to the J40 team while first street got
# their official data sharing channels setup.
SOURCE_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/fsf_flood.zip"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
LOAD_YAML_CONFIG: bool = True
# Output score variables (values set on datasets.yml) for linting purposes
COUNT_PROPERTIES: str

View file

@ -14,9 +14,12 @@ class WildfireRiskETL(ExtractTransformLoad):
"""ETL class for the First Street Foundation wildfire risk dataset"""
NAME = "fsf_wildfire_risk"
# These data were emailed to the J40 team while first street got
# their official data sharing channels setup.
SOURCE_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/fsf_fire.zip"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
LOAD_YAML_CONFIG: bool = True
ALASKA_AND_HAWAII_EXPECTED_IN_DATA = False
# Output score variables (values set on datasets.yml) for linting purposes

View file

@ -1,7 +1,7 @@
import pandas as pd
from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import (
get_module_logger,
unzip_file_from_url,
@ -11,6 +11,10 @@ logger = get_module_logger(__name__)
class GeoCorrETL(ExtractTransformLoad):
NAME = "geocorr"
GEO_LEVEL: ValidGeoLevel = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
def __init__(self):
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "geocorr"
@ -24,6 +28,10 @@ class GeoCorrETL(ExtractTransformLoad):
self.GEOCORR_PLACES_URL = "https://justice40-data.s3.amazonaws.com/data-sources/geocorr_urban_rural.csv.zip"
self.GEOCORR_GEOID_FIELD_NAME = "GEOID10_TRACT"
self.URBAN_HEURISTIC_FIELD_NAME = "Urban Heuristic Flag"
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.URBAN_HEURISTIC_FIELD_NAME,
]
self.df: pd.DataFrame
@ -35,13 +43,11 @@ class GeoCorrETL(ExtractTransformLoad):
file_url=settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/geocorr_urban_rural.csv.zip",
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "geocorr",
unzipped_file_path=self.get_tmp_path(),
)
self.df = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "geocorr"
/ "geocorr_urban_rural.csv",
filepath_or_buffer=self.get_tmp_path() / "geocorr_urban_rural.csv",
dtype={
self.GEOCORR_GEOID_FIELD_NAME: "string",
},
@ -50,22 +56,10 @@ class GeoCorrETL(ExtractTransformLoad):
def transform(self) -> None:
logger.info("Starting GeoCorr Urban Rural Map transform")
# Put in logic from Jupyter Notebook transform when we switch in the hyperlink to Geocorr
self.df.rename(
self.output_df = self.df.rename(
columns={
"urban_heuristic_flag": self.URBAN_HEURISTIC_FIELD_NAME,
},
inplace=True,
)
pass
# Put in logic from Jupyter Notebook transform when we switch in the hyperlink to Geocorr
def load(self) -> None:
logger.info("Saving GeoCorr Urban Rural Map Data")
# mkdir census
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)

View file

@ -1,6 +1,6 @@
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings
@ -8,11 +8,28 @@ logger = get_module_logger(__name__)
class HistoricRedliningETL(ExtractTransformLoad):
NAME = "historic_redlining"
GEO_LEVEL: ValidGeoLevel = ValidGeoLevel.CENSUS_TRACT
EXPECTED_MISSING_STATES = [
"10",
"11",
"16",
"23",
"30",
"32",
"35",
"38",
"46",
"50",
"56",
]
PUERTO_RICO_EXPECTED_IN_DATA = False
ALASKA_AND_HAWAII_EXPECTED_IN_DATA: bool = False
SOURCE_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/HRS_2010.zip"
def __init__(self):
self.CSV_PATH = self.DATA_PATH / "dataset" / "historic_redlining"
self.HISTORIC_REDLINING_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL + "/HRS_2010.zip"
)
self.HISTORIC_REDLINING_FILE_PATH = (
self.get_tmp_path() / "HRS_2010.xlsx"
)
@ -25,13 +42,6 @@ class HistoricRedliningETL(ExtractTransformLoad):
]
self.df: pd.DataFrame
def extract(self) -> None:
logger.info("Downloading Historic Redlining Data")
super().extract(
self.HISTORIC_REDLINING_URL,
self.get_tmp_path(),
)
def transform(self) -> None:
logger.info("Transforming Historic Redlining Data")
# this is obviously temporary
@ -57,16 +67,4 @@ class HistoricRedliningETL(ExtractTransformLoad):
f"{self.REDLINING_SCALAR} meets or exceeds {round(threshold, 2)}"
)
self.df = historic_redlining_data
def load(self) -> None:
logger.info("Saving Historic Redlining CSV")
# write selected states csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df[self.COLUMNS_TO_KEEP].to_csv(
self.CSV_PATH / "usa.csv", index=False
)
def validate(self) -> None:
logger.info("Validating Historic Redlining Data")
pass
self.output_df = historic_redlining_data

View file

@ -1,16 +1,18 @@
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
class HudHousingETL(ExtractTransformLoad):
NAME = "hud_housing"
GEO_LEVEL: ValidGeoLevel = ValidGeoLevel.CENSUS_TRACT
def __init__(self):
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "hud_housing"
self.GEOID_TRACT_FIELD_NAME = "GEOID10_TRACT"
self.HOUSING_FTP_URL = "https://www.huduser.gov/portal/datasets/cp/2014thru2018-140-csv.zip"
self.HOUSING_ZIP_FILE_DIR = self.get_tmp_path() / "hud_housing"
self.HOUSING_ZIP_FILE_DIR = self.get_tmp_path()
# We measure households earning less than 80% of HUD Area Median Family Income by county
# and paying greater than 30% of their income to housing costs.
@ -22,6 +24,14 @@ class HudHousingETL(ExtractTransformLoad):
self.NO_KITCHEN_OR_INDOOR_PLUMBING_FIELD_NAME = (
"Share of homes with no kitchen or indoor plumbing (percent)"
)
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME,
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME,
self.HOUSING_BURDEN_FIELD_NAME,
self.NO_KITCHEN_OR_INDOOR_PLUMBING_FIELD_NAME,
"DENOM INCL NOT COMPUTED",
]
# Note: some variable definitions.
# HUD-adjusted median family income (HAMFI).
@ -234,19 +244,4 @@ class HudHousingETL(ExtractTransformLoad):
float
)
def load(self) -> None:
logger.info("Saving HUD Housing Data")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
# Drop unnecessary fields
self.df[
[
self.GEOID_TRACT_FIELD_NAME,
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME,
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME,
self.HOUSING_BURDEN_FIELD_NAME,
self.NO_KITCHEN_OR_INDOOR_PLUMBING_FIELD_NAME,
"DENOM INCL NOT COMPUTED",
]
].to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)
self.output_df = self.df

View file

@ -18,6 +18,7 @@ class NationalRiskIndexETL(ExtractTransformLoad):
SOURCE_URL = "https://hazards.fema.gov/nri/Content/StaticDocuments/DataDownload//NRI_Table_CensusTracts/NRI_Table_CensusTracts.zip"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
LOAD_YAML_CONFIG: bool = True
# Output score variables (values set on datasets.yml) for linting purposes
RISK_INDEX_EXPECTED_ANNUAL_LOSS_SCORE_FIELD_NAME: str

View file

@ -20,6 +20,7 @@ class NatureDeprivedETL(ExtractTransformLoad):
)
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
LOAD_YAML_CONFIG: bool = True
ALASKA_AND_HAWAII_EXPECTED_IN_DATA = False
# Output score variables (values set on datasets.yml) for linting purposes
@ -65,7 +66,7 @@ class NatureDeprivedETL(ExtractTransformLoad):
df_ncld[self.TRACT_ACRES_FIELD_NAME] >= self.TRACT_ACRES_LOWER_BOUND
)
df_ncld[self.TRACT_PERCENT_NON_NATURAL_FIELD_NAME] = (
1 - df_ncld[self.PERCENT_NATURAL_FIELD_NAME]
100 - df_ncld[self.PERCENT_NATURAL_FIELD_NAME]
)
# Assign the final df to the class' output_df for the load method with rename

View file

@ -2,7 +2,7 @@ import functools
import pandas as pd
from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import (
get_module_logger,
unzip_file_from_url,
@ -19,6 +19,10 @@ class PersistentPovertyETL(ExtractTransformLoad):
Codebook: `https://s4.ad.brown.edu/Projects/Diversity/Researcher/LTBDDload/Dfiles/codebooks.pdf`.
"""
NAME = "persistent_poverty"
GEO_LEVEL: ValidGeoLevel = ValidGeoLevel.CENSUS_TRACT
PUERTO_RICO_EXPECTED_IN_DATA = False
def __init__(self):
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "persistent_poverty"
@ -75,7 +79,7 @@ class PersistentPovertyETL(ExtractTransformLoad):
def extract(self) -> None:
logger.info("Starting to download 86MB persistent poverty file.")
unzipped_file_path = self.get_tmp_path() / "persistent_poverty"
unzipped_file_path = self.get_tmp_path()
unzip_file_from_url(
file_url=settings.AWS_JUSTICE40_DATASOURCES_URL
@ -155,14 +159,4 @@ class PersistentPovertyETL(ExtractTransformLoad):
)
)
self.df = transformed_df
def load(self) -> None:
logger.info("Saving persistent poverty data.")
# mkdir census
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)
self.output_df = transformed_df

View file

@ -19,6 +19,7 @@ class USArmyFUDS(ExtractTransformLoad):
INELIGIBLE_FUDS_COUNT_FIELD_NAME: str
ELIGIBLE_FUDS_BINARY_FIELD_NAME: str
GEO_LEVEL: ValidGeoLevel = ValidGeoLevel.CENSUS_TRACT
LOAD_YAML_CONFIG: bool = True
ISLAND_AREAS_EXPECTED_IN_DATA = True