mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-08-01 22:34:18 -07:00
Score F, testing methodology (#510)
* fixing dependency issue * fixing more dependencies * including fraction of state AMI * wip * nitpick whitespace * etl working now * wip on scoring * fix rename error * reducing metrics * fixing score f * fixing readme * adding dependency * passing tests; * linting/black * removing unnecessary sample * fixing error * adding verify flag on etl/base Co-authored-by: Jorge Escobar <jorge.e.escobar@omb.eop.gov>
This commit is contained in:
parent
043ed983ea
commit
65ceb7900f
23 changed files with 557 additions and 153 deletions
|
@ -10,14 +10,19 @@ logger = get_module_logger(__name__)
|
|||
class CalEnviroScreenETL(ExtractTransformLoad):
|
||||
def __init__(self):
|
||||
self.CALENVIROSCREEN_FTP_URL = (
|
||||
settings.AWS_JUSTICE40_DATASOURCES_URL + "/CalEnviroScreen_4.0_2021.zip"
|
||||
settings.AWS_JUSTICE40_DATASOURCES_URL
|
||||
+ "/CalEnviroScreen_4.0_2021.zip"
|
||||
)
|
||||
self.CALENVIROSCREEN_CSV = (
|
||||
self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv"
|
||||
)
|
||||
self.CALENVIROSCREEN_CSV = self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv"
|
||||
self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4"
|
||||
|
||||
# Definining some variable names
|
||||
self.CALENVIROSCREEN_SCORE_FIELD_NAME = "calenviroscreen_score"
|
||||
self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME = "calenviroscreen_percentile"
|
||||
self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME = (
|
||||
"calenviroscreen_percentile"
|
||||
)
|
||||
self.CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD_NAME = (
|
||||
"calenviroscreen_priority_community"
|
||||
)
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
import pandas as pd
|
||||
|
||||
from data_pipeline.etl.base import ExtractTransformLoad
|
||||
from data_pipeline.utils import get_module_logger, download_file_from_url
|
||||
|
||||
logger = get_module_logger(__name__)
|
||||
|
||||
|
||||
class CDCPlacesETL(ExtractTransformLoad):
|
||||
def __init__(self):
|
||||
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "cdc_places"
|
||||
|
||||
self.CDC_PLACES_URL = "https://chronicdata.cdc.gov/api/views/cwsq-ngmh/rows.csv?accessType=DOWNLOAD"
|
||||
self.CDC_GEOID_FIELD_NAME = "LocationID"
|
||||
self.CDC_VALUE_FIELD_NAME = "Data_Value"
|
||||
self.CDC_MEASURE_FIELD_NAME = "Measure"
|
||||
|
||||
self.df: pd.DataFrame
|
||||
|
||||
def extract(self) -> None:
|
||||
logger.info("Starting to download 520MB CDC Places file.")
|
||||
file_path = download_file_from_url(
|
||||
file_url=self.CDC_PLACES_URL,
|
||||
download_file_name=self.TMP_PATH
|
||||
/ "cdc_places"
|
||||
/ "census_tract.csv",
|
||||
)
|
||||
|
||||
self.df = pd.read_csv(
|
||||
filepath_or_buffer=file_path,
|
||||
dtype={self.CDC_GEOID_FIELD_NAME: "string"},
|
||||
low_memory=False,
|
||||
)
|
||||
|
||||
def transform(self) -> None:
|
||||
logger.info("Starting CDC Places transform")
|
||||
|
||||
# Rename GEOID field
|
||||
self.df.rename(
|
||||
columns={self.CDC_GEOID_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME},
|
||||
inplace=True,
|
||||
errors="raise",
|
||||
)
|
||||
|
||||
# Note: Puerto Rico not included.
|
||||
self.df = self.df.pivot(
|
||||
index=self.GEOID_TRACT_FIELD_NAME,
|
||||
columns=self.CDC_MEASURE_FIELD_NAME,
|
||||
values=self.CDC_VALUE_FIELD_NAME,
|
||||
)
|
||||
|
||||
# Make the index (the census tract ID) a column, not the index.
|
||||
self.df.reset_index(inplace=True)
|
||||
|
||||
def load(self) -> None:
|
||||
logger.info("Saving CDC Places Data")
|
||||
|
||||
# mkdir census
|
||||
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.df.to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)
|
||||
|
||||
def validate(self) -> None:
|
||||
logger.info("Validating Census ACS Data")
|
||||
|
||||
pass
|
|
@ -33,7 +33,9 @@ class CensusETL(ExtractTransformLoad):
|
|||
self.NATIONAL_CBG_CSV_PATH = self.CSV_BASE_PATH / "us.csv"
|
||||
self.NATIONAL_CBG_JSON_PATH = self.GEOJSON_BASE_PATH / "us.json"
|
||||
|
||||
def _path_for_fips_file(self, fips_code: str, file_type: GeoFileType) -> Path:
|
||||
def _path_for_fips_file(
|
||||
self, fips_code: str, file_type: GeoFileType
|
||||
) -> Path:
|
||||
"""Get paths for associated geospatial files for the provided FIPS code
|
||||
|
||||
Args:
|
||||
|
@ -93,7 +95,9 @@ class CensusETL(ExtractTransformLoad):
|
|||
None
|
||||
"""
|
||||
shp_file_path = self._path_for_fips_file(fips_code, GeoFileType.SHP)
|
||||
geojson_file_path = self._path_for_fips_file(fips_code, GeoFileType.GEOJSON)
|
||||
geojson_file_path = self._path_for_fips_file(
|
||||
fips_code, GeoFileType.GEOJSON
|
||||
)
|
||||
logger.info(f"Checking if {fips_code} geoJSON file exists ")
|
||||
if not geojson_file_path.is_file():
|
||||
logger.info(
|
||||
|
@ -176,7 +180,9 @@ class CensusETL(ExtractTransformLoad):
|
|||
|
||||
if not self.NATIONAL_CBG_CSV_PATH.is_file():
|
||||
logger.info(f"Creating {self.NATIONAL_CBG_CSV_PATH}")
|
||||
with open(self.NATIONAL_CBG_CSV_PATH, mode="w", newline="") as cbg_csv_file:
|
||||
with open(
|
||||
self.NATIONAL_CBG_CSV_PATH, mode="w", newline=""
|
||||
) as cbg_csv_file:
|
||||
cbg_csv_file_writer = csv.writer(
|
||||
cbg_csv_file,
|
||||
delimiter=",",
|
||||
|
@ -205,7 +211,9 @@ class CensusETL(ExtractTransformLoad):
|
|||
state_gdf = gpd.read_file(file_name)
|
||||
usa_df = usa_df.append(state_gdf)
|
||||
|
||||
usa_df = usa_df.to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")
|
||||
usa_df = usa_df.to_crs(
|
||||
"+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs"
|
||||
)
|
||||
logger.info("Writing national geojson file")
|
||||
usa_df.to_file(self.NATIONAL_CBG_JSON_PATH, driver="GeoJSON")
|
||||
|
||||
|
|
|
@ -41,10 +41,10 @@ class CensusACSETL(ExtractTransformLoad):
|
|||
|
||||
self.STATE_MEDIAN_INCOME_FTP_URL = (
|
||||
settings.AWS_JUSTICE40_DATASOURCES_URL
|
||||
+ "/2014_to_2019_state_median_income.zip"
|
||||
+ "/2015_to_2019_state_median_income.zip"
|
||||
)
|
||||
self.STATE_MEDIAN_INCOME_FILE_PATH = (
|
||||
self.TMP_PATH / "2014_to_2019_state_median_income.csv"
|
||||
self.TMP_PATH / "2015_to_2019_state_median_income.csv"
|
||||
)
|
||||
|
||||
def _fips_from_censusdata_censusgeo(
|
||||
|
|
|
@ -8,9 +8,7 @@ logger = get_module_logger(__name__)
|
|||
|
||||
class EJScreenETL(ExtractTransformLoad):
|
||||
def __init__(self):
|
||||
self.EJSCREEN_FTP_URL = (
|
||||
"https://gaftp.epa.gov/EJSCREEN/2019/EJSCREEN_2019_StatePctile.csv.zip"
|
||||
)
|
||||
self.EJSCREEN_FTP_URL = "https://gaftp.epa.gov/EJSCREEN/2019/EJSCREEN_2019_StatePctile.csv.zip"
|
||||
self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_2019_StatePctiles.csv"
|
||||
self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019"
|
||||
self.df: pd.DataFrame
|
||||
|
@ -20,6 +18,7 @@ class EJScreenETL(ExtractTransformLoad):
|
|||
super().extract(
|
||||
self.EJSCREEN_FTP_URL,
|
||||
self.TMP_PATH,
|
||||
verify=False, # EPA EJScreen end point has certificate issues often
|
||||
)
|
||||
|
||||
def transform(self) -> None:
|
||||
|
|
|
@ -35,7 +35,9 @@ class HousingTransportationETL(ExtractTransformLoad):
|
|||
)
|
||||
|
||||
# New file name:
|
||||
tmp_csv_file_path = zip_file_dir / f"htaindex_data_blkgrps_{fips}.csv"
|
||||
tmp_csv_file_path = (
|
||||
zip_file_dir / f"htaindex_data_blkgrps_{fips}.csv"
|
||||
)
|
||||
tmp_df = pd.read_csv(filepath_or_buffer=tmp_csv_file_path)
|
||||
|
||||
dfs.append(tmp_df)
|
||||
|
@ -47,9 +49,9 @@ class HousingTransportationETL(ExtractTransformLoad):
|
|||
|
||||
# Rename and reformat block group ID
|
||||
self.df.rename(columns={"blkgrp": self.GEOID_FIELD_NAME}, inplace=True)
|
||||
self.df[self.GEOID_FIELD_NAME] = self.df[self.GEOID_FIELD_NAME].str.replace(
|
||||
'"', ""
|
||||
)
|
||||
self.df[self.GEOID_FIELD_NAME] = self.df[
|
||||
self.GEOID_FIELD_NAME
|
||||
].str.replace('"', "")
|
||||
|
||||
def load(self) -> None:
|
||||
logger.info("Saving Housing and Transportation Data")
|
||||
|
|
|
@ -9,16 +9,16 @@ class HudHousingETL(ExtractTransformLoad):
|
|||
def __init__(self):
|
||||
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "hud_housing"
|
||||
self.GEOID_TRACT_FIELD_NAME = "GEOID10_TRACT"
|
||||
self.HOUSING_FTP_URL = (
|
||||
"https://www.huduser.gov/portal/datasets/cp/2012thru2016-140-csv.zip"
|
||||
)
|
||||
self.HOUSING_FTP_URL = "https://www.huduser.gov/portal/datasets/cp/2012thru2016-140-csv.zip"
|
||||
self.HOUSING_ZIP_FILE_DIR = self.TMP_PATH / "hud_housing"
|
||||
|
||||
# We measure households earning less than 80% of HUD Area Median Family Income by county
|
||||
# and paying greater than 30% of their income to housing costs.
|
||||
self.HOUSING_BURDEN_FIELD_NAME = "Housing burden (percent)"
|
||||
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME = "HOUSING_BURDEN_NUMERATOR"
|
||||
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME = "HOUSING_BURDEN_DENOMINATOR"
|
||||
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME = (
|
||||
"HOUSING_BURDEN_DENOMINATOR"
|
||||
)
|
||||
|
||||
# Note: some variable definitions.
|
||||
# HUD-adjusted median family income (HAMFI).
|
||||
|
@ -55,7 +55,9 @@ class HudHousingETL(ExtractTransformLoad):
|
|||
)
|
||||
|
||||
# Rename and reformat block group ID
|
||||
self.df.rename(columns={"geoid": self.GEOID_TRACT_FIELD_NAME}, inplace=True)
|
||||
self.df.rename(
|
||||
columns={"geoid": self.GEOID_TRACT_FIELD_NAME}, inplace=True
|
||||
)
|
||||
|
||||
# The CHAS data has census tract ids such as `14000US01001020100`
|
||||
# Whereas the rest of our data uses, for the same tract, `01001020100`.
|
||||
|
@ -273,7 +275,9 @@ class HudHousingETL(ExtractTransformLoad):
|
|||
# TODO: add small sample size checks
|
||||
self.df[self.HOUSING_BURDEN_FIELD_NAME] = self.df[
|
||||
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME
|
||||
].astype(float) / self.df[self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME].astype(
|
||||
].astype(float) / self.df[
|
||||
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME
|
||||
].astype(
|
||||
float
|
||||
)
|
||||
|
||||
|
|
|
@ -18,7 +18,9 @@ class HudRecapETL(ExtractTransformLoad):
|
|||
self.CSV_PATH = self.DATA_PATH / "dataset" / "hud_recap"
|
||||
|
||||
# Definining some variable names
|
||||
self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME = "hud_recap_priority_community"
|
||||
self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME = (
|
||||
"hud_recap_priority_community"
|
||||
)
|
||||
|
||||
self.df: pd.DataFrame
|
||||
|
||||
|
|
|
@ -8,9 +8,7 @@ logger = get_module_logger(__name__)
|
|||
|
||||
class TreeEquityScoreETL(ExtractTransformLoad):
|
||||
def __init__(self):
|
||||
self.TES_URL = (
|
||||
"https://national-tes-data-share.s3.amazonaws.com/national_tes_share/"
|
||||
)
|
||||
self.TES_URL = "https://national-tes-data-share.s3.amazonaws.com/national_tes_share/"
|
||||
self.TES_CSV = self.TMP_PATH / "tes_2021_data.csv"
|
||||
self.CSV_PATH = self.DATA_PATH / "dataset" / "tree_equity_score"
|
||||
self.df: gpd.GeoDataFrame
|
||||
|
@ -78,8 +76,12 @@ class TreeEquityScoreETL(ExtractTransformLoad):
|
|||
logger.info("Transforming Tree Equity Score Data")
|
||||
tes_state_dfs = []
|
||||
for state in self.states:
|
||||
tes_state_dfs.append(gpd.read_file(f"{self.TMP_PATH}/{state}/{state}.shp"))
|
||||
self.df = gpd.GeoDataFrame(pd.concat(tes_state_dfs), crs=tes_state_dfs[0].crs)
|
||||
tes_state_dfs.append(
|
||||
gpd.read_file(f"{self.TMP_PATH}/{state}/{state}.shp")
|
||||
)
|
||||
self.df = gpd.GeoDataFrame(
|
||||
pd.concat(tes_state_dfs), crs=tes_state_dfs[0].crs
|
||||
)
|
||||
|
||||
def load(self) -> None:
|
||||
logger.info("Saving Tree Equity Score GeoJSON")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue