Data directory should adopt standard Poetry-suggested python package structure (#457)

* Fixes #456 - Our data directory should adopt standard python package structure
* a few missed references
* updating readme
* updating requirements
* Running Black
* Fixes for flake8
* updating pylint
This commit is contained in:
Nat Hillard 2021-08-05 15:35:54 -04:00 committed by GitHub
commit c1568e87c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
61 changed files with 1273 additions and 1256 deletions

View file

@ -0,0 +1,72 @@
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings
logger = get_module_logger(__name__)
class CalEnviroScreenETL(ExtractTransformLoad):
def __init__(self):
self.CALENVIROSCREEN_FTP_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL + "/CalEnviroScreen_4.0_2021.zip"
)
self.CALENVIROSCREEN_CSV = self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4"
# Definining some variable names
self.CALENVIROSCREEN_SCORE_FIELD_NAME = "calenviroscreen_score"
self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME = "calenviroscreen_percentile"
self.CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD_NAME = (
"calenviroscreen_priority_community"
)
# Choosing constants.
# None of these numbers are final, but just for the purposes of comparison.
self.CALENVIROSCREEN_PRIORITY_COMMUNITY_THRESHOLD = 75
self.df: pd.DataFrame
def extract(self) -> None:
logger.info("Downloading CalEnviroScreen Data")
super().extract(
self.CALENVIROSCREEN_FTP_URL,
self.TMP_PATH,
)
def transform(self) -> None:
logger.info("Transforming CalEnviroScreen Data")
# Data from https://calenviroscreen-oehha.hub.arcgis.com/#Data, specifically:
# https://oehha.ca.gov/media/downloads/calenviroscreen/document/calenviroscreen40resultsdatadictionaryd12021.zip
# Load comparison index (CalEnviroScreen 4)
self.df = pd.read_csv(
self.CALENVIROSCREEN_CSV, dtype={"Census Tract": "string"}
)
self.df.rename(
columns={
"Census Tract": self.GEOID_TRACT_FIELD_NAME,
"DRAFT CES 4.0 Score": self.CALENVIROSCREEN_SCORE_FIELD_NAME,
"DRAFT CES 4.0 Percentile": self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME,
},
inplace=True,
)
# Add a leading "0" to the Census Tract to match our format in other data frames.
self.df[self.GEOID_TRACT_FIELD_NAME] = (
"0" + self.df[self.GEOID_TRACT_FIELD_NAME]
)
# Calculate the top K% of prioritized communities
self.df[self.CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD_NAME] = (
self.df[self.CALENVIROSCREEN_PERCENTILE_FIELD_NAME]
>= self.CALENVIROSCREEN_PRIORITY_COMMUNITY_THRESHOLD
)
def load(self) -> None:
logger.info("Saving CalEnviroScreen CSV")
# write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.CSV_PATH / "data06.csv", index=False)

View file

@ -0,0 +1,124 @@
import csv
import json
import os
from pathlib import Path
import geopandas as gpd
from data_pipeline.utils import get_module_logger, unzip_file_from_url
from .etl_utils import get_state_fips_codes
logger = get_module_logger(__name__)
def download_census_csvs(data_path: Path) -> None:
"""Download all census shape files from the Census FTP and extract the geojson
to generate national and by state Census Block Group CSVs and GeoJSONs
Args:
data_path (pathlib.Path): Name of the directory where the files and directories will
be created
Returns:
None
"""
# the fips_states_2010.csv is generated from data here
# https://www.census.gov/geographies/reference-files/time-series/geo/tallies.html
state_fips_codes = get_state_fips_codes(data_path)
geojson_dir_path = data_path / "census" / "geojson"
for fips in state_fips_codes:
# check if file exists
shp_file_path = data_path / "census" / "shp" / fips / f"tl_2010_{fips}_bg10.shp"
logger.info(f"Checking if {fips} file exists")
if not os.path.isfile(shp_file_path):
logger.info(f"Downloading and extracting {fips} shape file")
# 2020 tiger data is here: https://www2.census.gov/geo/tiger/TIGER2020/BG/
# But using 2010 for now
cbg_state_url = f"https://www2.census.gov/geo/tiger/TIGER2010/BG/2010/tl_2010_{fips}_bg10.zip"
unzip_file_from_url(
cbg_state_url,
data_path / "tmp",
data_path / "census" / "shp" / fips,
)
cmd = (
"ogr2ogr -f GeoJSON data/census/geojson/"
+ fips
+ ".json data/census/shp/"
+ fips
+ "/tl_2010_"
+ fips
+ "_bg10.shp"
)
os.system(cmd)
# generate CBG CSV table for pandas
## load in memory
cbg_national = [] # in-memory global list
cbg_per_state: dict = {} # in-memory dict per state
for file in os.listdir(geojson_dir_path):
if file.endswith(".json"):
logger.info(f"Ingesting geoid10 for file {file}")
with open(geojson_dir_path / file) as f:
geojson = json.load(f)
for feature in geojson["features"]:
geoid10 = feature["properties"]["GEOID10"]
cbg_national.append(str(geoid10))
geoid10_state_id = geoid10[:2]
if not cbg_per_state.get(geoid10_state_id):
cbg_per_state[geoid10_state_id] = []
cbg_per_state[geoid10_state_id].append(geoid10)
csv_dir_path = data_path / "census" / "csv"
## write to individual state csv
for state_id in cbg_per_state:
geoid10_list = cbg_per_state[state_id]
with open(
csv_dir_path / f"{state_id}.csv", mode="w", newline=""
) as cbg_csv_file:
cbg_csv_file_writer = csv.writer(
cbg_csv_file,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
)
for geoid10 in geoid10_list:
cbg_csv_file_writer.writerow(
[
geoid10,
]
)
## write US csv
with open(csv_dir_path / "us.csv", mode="w", newline="") as cbg_csv_file:
cbg_csv_file_writer = csv.writer(
cbg_csv_file,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
)
for geoid10 in cbg_national:
cbg_csv_file_writer.writerow(
[
geoid10,
]
)
## create national geojson
logger.info("Generating national geojson file")
usa_df = gpd.GeoDataFrame()
for file_name in geojson_dir_path.rglob("*.json"):
logger.info(f"Ingesting {file_name}")
state_gdf = gpd.read_file(file_name)
usa_df = usa_df.append(state_gdf)
usa_df = usa_df.to_crs("+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs")
logger.info("Writing national geojson file")
usa_df.to_file(geojson_dir_path / "us.json", driver="GeoJSON")
logger.info("Census block groups downloading complete")

View file

@ -0,0 +1,71 @@
import csv
import os
from pathlib import Path
import pandas as pd
from data_pipeline.config import settings
from data_pipeline.utils import (
get_module_logger,
remove_all_dirs_from_dir,
remove_files_from_dir,
unzip_file_from_url,
)
logger = get_module_logger(__name__)
def reset_data_directories(data_path: Path) -> None:
census_data_path = data_path / "census"
# csv
csv_path = census_data_path / "csv"
remove_files_from_dir(csv_path, ".csv")
# geojson
geojson_path = census_data_path / "geojson"
remove_files_from_dir(geojson_path, ".json")
# shp
shp_path = census_data_path / "shp"
remove_all_dirs_from_dir(shp_path)
def get_state_fips_codes(data_path: Path) -> list:
fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv"
# check if file exists
if not os.path.isfile(fips_csv_path):
logger.info("Downloading fips from S3 repository")
unzip_file_from_url(
settings.AWS_JUSTICE40_DATASOURCES_URL + "/fips_states_2010.zip",
data_path / "tmp",
data_path / "census" / "csv",
)
fips_state_list = []
with open(fips_csv_path) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")
line_count = 0
for row in csv_reader:
if line_count == 0:
line_count += 1
else:
fips = row[0].strip()
fips_state_list.append(fips)
return fips_state_list
def get_state_information(data_path: Path) -> pd.DataFrame:
"""Load the full state file as a dataframe.
Useful because of the state regional information.
"""
fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv"
df = pd.read_csv(fips_csv_path)
# Left pad the FIPS codes with 0s
df["fips"] = df["fips"].astype(str).apply(lambda x: x.zfill(2))
return df

View file

@ -0,0 +1,103 @@
import pandas as pd
import censusdata
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
class CensusACSETL(ExtractTransformLoad):
def __init__(self):
self.ACS_YEAR = 2019
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / f"census_acs_{self.ACS_YEAR}"
self.UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME = "Linguistic isolation (total)"
self.LINGUISTIC_ISOLATION_FIELDS = [
"C16002_001E",
"C16002_004E",
"C16002_007E",
"C16002_010E",
"C16002_013E",
]
self.df: pd.DataFrame
def _fips_from_censusdata_censusgeo(self, censusgeo: censusdata.censusgeo) -> str:
"""Create a FIPS code from the proprietary censusgeo index."""
fips = "".join([value for (key, value) in censusgeo.params()])
return fips
def extract(self) -> None:
dfs = []
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(f"Downloading data for state/territory with FIPS code {fips}")
dfs.append(
censusdata.download(
src="acs5",
year=self.ACS_YEAR,
geo=censusdata.censusgeo(
[("state", fips), ("county", "*"), ("block group", "*")]
),
var=[
# Emploment fields
"B23025_005E",
"B23025_003E",
]
+ self.LINGUISTIC_ISOLATION_FIELDS,
)
)
self.df = pd.concat(dfs)
self.df[self.GEOID_FIELD_NAME] = self.df.index.to_series().apply(
func=self._fips_from_censusdata_censusgeo
)
def transform(self) -> None:
logger.info("Starting Census ACS Transform")
# Calculate percent unemployment.
# TODO: remove small-sample data that should be `None` instead of a high-variance fraction.
self.df[self.UNEMPLOYED_FIELD_NAME] = self.df.B23025_005E / self.df.B23025_003E
# Calculate linguistic isolation.
individual_limited_english_fields = [
"C16002_004E",
"C16002_007E",
"C16002_010E",
"C16002_013E",
]
self.df[self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME] = self.df[
individual_limited_english_fields
].sum(axis=1, skipna=True)
self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME] = (
self.df[self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME].astype(float)
/ self.df["C16002_001E"]
)
self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME].describe()
def load(self) -> None:
logger.info("Saving Census ACS Data")
# mkdir census
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
columns_to_include = [
self.GEOID_FIELD_NAME,
self.UNEMPLOYED_FIELD_NAME,
self.LINGUISTIC_ISOLATION_FIELD_NAME,
]
self.df[columns_to_include].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)
def validate(self) -> None:
logger.info("Validating Census ACS Data")
pass

View file

@ -0,0 +1,38 @@
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
class EJScreenETL(ExtractTransformLoad):
def __init__(self):
self.EJSCREEN_FTP_URL = (
"https://gaftp.epa.gov/EJSCREEN/2019/EJSCREEN_2019_StatePctile.csv.zip"
)
self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_2019_StatePctiles.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019"
self.df: pd.DataFrame
def extract(self) -> None:
logger.info("Downloading EJScreen Data")
super().extract(
self.EJSCREEN_FTP_URL, self.TMP_PATH,
)
def transform(self) -> None:
logger.info("Transforming EJScreen Data")
self.df = pd.read_csv(
self.EJSCREEN_CSV,
dtype={"ID": "string"},
# EJSCREEN writes the word "None" for NA data.
na_values=["None"],
low_memory=False,
)
def load(self) -> None:
logger.info("Saving EJScreen CSV")
# write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.CSV_PATH / "usa.csv", index=False)

View file

@ -0,0 +1,58 @@
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
from data_pipeline.utils import get_module_logger, unzip_file_from_url
logger = get_module_logger(__name__)
class HousingTransportationETL(ExtractTransformLoad):
def __init__(self):
self.HOUSING_FTP_URL = (
"https://htaindex.cnt.org/download/download.php?focus=blkgrp&geoid="
)
self.OUTPUT_PATH = (
self.DATA_PATH / "dataset" / "housing_and_transportation_index"
)
self.df: pd.DataFrame
def extract(self) -> None:
# Download each state / territory individually
dfs = []
zip_file_dir = self.TMP_PATH / "housing_and_transportation_index"
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(
f"Downloading housing data for state/territory with FIPS code {fips}"
)
# Puerto Rico has no data, so skip
if fips == "72":
continue
unzip_file_from_url(
f"{self.HOUSING_FTP_URL}{fips}", self.TMP_PATH, zip_file_dir
)
# New file name:
tmp_csv_file_path = zip_file_dir / f"htaindex_data_blkgrps_{fips}.csv"
tmp_df = pd.read_csv(filepath_or_buffer=tmp_csv_file_path)
dfs.append(tmp_df)
self.df = pd.concat(dfs)
def transform(self) -> None:
logger.info("Transforming Housing and Transportation Data")
# Rename and reformat block group ID
self.df.rename(columns={"blkgrp": self.GEOID_FIELD_NAME}, inplace=True)
self.df[self.GEOID_FIELD_NAME] = self.df[self.GEOID_FIELD_NAME].str.replace(
'"', ""
)
def load(self) -> None:
logger.info("Saving Housing and Transportation Data")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)

View file

@ -0,0 +1,289 @@
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
class HudHousingETL(ExtractTransformLoad):
def __init__(self):
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / "hud_housing"
self.GEOID_TRACT_FIELD_NAME = "GEOID10_TRACT"
self.HOUSING_FTP_URL = (
"https://www.huduser.gov/portal/datasets/cp/2012thru2016-140-csv.zip"
)
self.HOUSING_ZIP_FILE_DIR = self.TMP_PATH / "hud_housing"
# We measure households earning less than 80% of HUD Area Median Family Income by county
# and paying greater than 30% of their income to housing costs.
self.HOUSING_BURDEN_FIELD_NAME = "Housing burden (percent)"
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME = "HOUSING_BURDEN_NUMERATOR"
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME = "HOUSING_BURDEN_DENOMINATOR"
# Note: some variable definitions.
# HUD-adjusted median family income (HAMFI).
# The four housing problems are:
# - incomplete kitchen facilities,
# - incomplete plumbing facilities,
# - more than 1 person per room,
# - cost burden greater than 30%.
# Table 8 is the desired table.
self.df: pd.DataFrame
def extract(self) -> None:
logger.info("Extracting HUD Housing Data")
super().extract(
self.HOUSING_FTP_URL, self.HOUSING_ZIP_FILE_DIR,
)
def transform(self) -> None:
logger.info("Transforming HUD Housing Data")
# New file name:
tmp_csv_file_path = (
self.HOUSING_ZIP_FILE_DIR
/ "2012thru2016-140-csv"
/ "2012thru2016-140-csv"
/ "140"
/ "Table8.csv"
)
self.df = pd.read_csv(filepath_or_buffer=tmp_csv_file_path, encoding="latin-1",)
# Rename and reformat block group ID
self.df.rename(columns={"geoid": self.GEOID_TRACT_FIELD_NAME}, inplace=True)
# The CHAS data has census tract ids such as `14000US01001020100`
# Whereas the rest of our data uses, for the same tract, `01001020100`.
# the characters before `US`:
self.df[self.GEOID_TRACT_FIELD_NAME] = self.df[
self.GEOID_TRACT_FIELD_NAME
].str.replace(r"^.*?US", "", regex=True)
# Calculate housing burden
# This is quite a number of steps. It does not appear to be accessible nationally in a simpler format, though.
# See "CHAS data dictionary 12-16.xlsx"
# Owner occupied numerator fields
OWNER_OCCUPIED_NUMERATOR_FIELDS = [
# Column Name
# Line_Type
# Tenure
# Household income
# Cost burden
# Facilities
"T8_est7",
# Subtotal
# Owner occupied
# less than or equal to 30% of HAMFI
# greater than 30% but less than or equal to 50%
# All
"T8_est10",
# Subtotal
# Owner occupied
# less than or equal to 30% of HAMFI
# greater than 50%
# All
"T8_est20",
# Subtotal
# Owner occupied
# greater than 30% but less than or equal to 50% of HAMFI
# greater than 30% but less than or equal to 50%
# All
"T8_est23",
# Subtotal
# Owner occupied
# greater than 30% but less than or equal to 50% of HAMFI
# greater than 50%
# All
"T8_est33",
# Subtotal
# Owner occupied
# greater than 50% but less than or equal to 80% of HAMFI
# greater than 30% but less than or equal to 50%
# All
"T8_est36",
# Subtotal
# Owner occupied
# greater than 50% but less than or equal to 80% of HAMFI
# greater than 50%
# All
]
# These rows have the values where HAMFI was not computed, b/c of no or negative income.
OWNER_OCCUPIED_NOT_COMPUTED_FIELDS = [
# Column Name
# Line_Type
# Tenure
# Household income
# Cost burden
# Facilities
"T8_est13",
# Subtotal
# Owner occupied
# less than or equal to 30% of HAMFI
# not computed (no/negative income)
# All
"T8_est26",
# Subtotal
# Owner occupied
# greater than 30% but less than or equal to 50% of HAMFI
# not computed (no/negative income)
# All
"T8_est39",
# Subtotal
# Owner occupied
# greater than 50% but less than or equal to 80% of HAMFI
# not computed (no/negative income)
# All
"T8_est52",
# Subtotal
# Owner occupied
# greater than 80% but less than or equal to 100% of HAMFI
# not computed (no/negative income)
# All
"T8_est65",
# Subtotal
# Owner occupied
# greater than 100% of HAMFI
# not computed (no/negative income)
# All
]
OWNER_OCCUPIED_POPULATION_FIELD = "T8_est2"
# Subtotal
# Owner occupied
# All
# All
# All
# Renter occupied numerator fields
RENTER_OCCUPIED_NUMERATOR_FIELDS = [
# Column Name
# Line_Type
# Tenure
# Household income
# Cost burden
# Facilities
"T8_est73",
# Subtotal
# Renter occupied
# less than or equal to 30% of HAMFI
# greater than 30% but less than or equal to 50%
# All
"T8_est76",
# Subtotal
# Renter occupied
# less than or equal to 30% of HAMFI
# greater than 50%
# All
"T8_est86",
# Subtotal
# Renter occupied
# greater than 30% but less than or equal to 50% of HAMFI
# greater than 30% but less than or equal to 50%
# All
"T8_est89",
# Subtotal
# Renter occupied
# greater than 30% but less than or equal to 50% of HAMFI
# greater than 50%
# All
"T8_est99",
# Subtotal
# Renter occupied greater than 50% but less than or equal to 80% of HAMFI
# greater than 30% but less than or equal to 50%
# All
"T8_est102",
# Subtotal
# Renter occupied
# greater than 50% but less than or equal to 80% of HAMFI
# greater than 50%
# All
]
# These rows have the values where HAMFI was not computed, b/c of no or negative income.
RENTER_OCCUPIED_NOT_COMPUTED_FIELDS = [
# Column Name
# Line_Type
# Tenure
# Household income
# Cost burden
# Facilities
"T8_est79",
# Subtotal
# Renter occupied less than or equal to 30% of HAMFI
# not computed (no/negative income)
# All
"T8_est92",
# Subtotal
# Renter occupied greater than 30% but less than or equal to 50% of HAMFI
# not computed (no/negative income)
# All
"T8_est105",
# Subtotal
# Renter occupied
# greater than 50% but less than or equal to 80% of HAMFI
# not computed (no/negative income)
# All
"T8_est118",
# Subtotal
# Renter occupied greater than 80% but less than or equal to 100% of HAMFI
# not computed (no/negative income)
# All
"T8_est131",
# Subtotal
# Renter occupied
# greater than 100% of HAMFI
# not computed (no/negative income)
# All
]
# T8_est68 Subtotal Renter occupied All All All
RENTER_OCCUPIED_POPULATION_FIELD = "T8_est68"
# Math:
# (
# # of Owner Occupied Units Meeting Criteria
# + # of Renter Occupied Units Meeting Criteria
# )
# divided by
# (
# Total # of Owner Occupied Units
# + Total # of Renter Occupied Units
# - # of Owner Occupied Units with HAMFI Not Computed
# - # of Renter Occupied Units with HAMFI Not Computed
# )
self.df[self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME] = self.df[
OWNER_OCCUPIED_NUMERATOR_FIELDS
].sum(axis=1) + self.df[RENTER_OCCUPIED_NUMERATOR_FIELDS].sum(axis=1)
self.df[self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME] = (
self.df[OWNER_OCCUPIED_POPULATION_FIELD]
+ self.df[RENTER_OCCUPIED_POPULATION_FIELD]
- self.df[OWNER_OCCUPIED_NOT_COMPUTED_FIELDS].sum(axis=1)
- self.df[RENTER_OCCUPIED_NOT_COMPUTED_FIELDS].sum(axis=1)
)
# TODO: add small sample size checks
self.df[self.HOUSING_BURDEN_FIELD_NAME] = self.df[
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME
].astype(float) / self.df[self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME].astype(
float
)
def load(self) -> None:
logger.info("Saving HUD Housing Data")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
# Drop unnecessary fields
self.df[
[
self.GEOID_TRACT_FIELD_NAME,
self.HOUSING_BURDEN_NUMERATOR_FIELD_NAME,
self.HOUSING_BURDEN_DENOMINATOR_FIELD_NAME,
self.HOUSING_BURDEN_FIELD_NAME,
]
].to_csv(path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False)

View file

@ -0,0 +1,64 @@
import pandas as pd
import requests
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
class HudRecapETL(ExtractTransformLoad):
def __init__(self):
# pylint: disable=line-too-long
self.HUD_RECAP_CSV_URL = "https://opendata.arcgis.com/api/v3/datasets/56de4edea8264fe5a344da9811ef5d6e_0/downloads/data?format=csv&spatialRefId=4326" # noqa: E501
self.HUD_RECAP_CSV = (
self.TMP_PATH
/ "Racially_or_Ethnically_Concentrated_Areas_of_Poverty__R_ECAPs_.csv"
)
self.CSV_PATH = self.DATA_PATH / "dataset" / "hud_recap"
# Definining some variable names
self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME = "hud_recap_priority_community"
self.df: pd.DataFrame
def extract(self) -> None:
logger.info("Downloading HUD Recap Data")
download = requests.get(self.HUD_RECAP_CSV_URL, verify=None)
file_contents = download.content
csv_file = open(self.HUD_RECAP_CSV, "wb")
csv_file.write(file_contents)
csv_file.close()
def transform(self) -> None:
logger.info("Transforming HUD Recap Data")
# Load comparison index (CalEnviroScreen 4)
self.df = pd.read_csv(self.HUD_RECAP_CSV, dtype={"GEOID": "string"})
self.df.rename(
columns={
"GEOID": self.GEOID_TRACT_FIELD_NAME,
# Interestingly, there's no data dictionary for the RECAP data that I could find.
# However, this site (http://www.schousing.com/library/Tax%20Credit/2020/QAP%20Instructions%20(2).pdf)
# suggests:
# "If RCAP_Current for the tract in which the site is located is 1, the tract is an R/ECAP. If RCAP_Current is 0, it is not."
"RCAP_Current": self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME,
},
inplace=True,
)
# Convert to boolean
self.df[self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME] = self.df[
self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME
].astype("bool")
self.df[self.HUD_RECAP_PRIORITY_COMMUNITY_FIELD_NAME].value_counts()
self.df.sort_values(by=self.GEOID_TRACT_FIELD_NAME, inplace=True)
def load(self) -> None:
logger.info("Saving HUD Recap CSV")
# write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.CSV_PATH / "usa.csv", index=False)

View file

@ -0,0 +1,3 @@
# Tree Equity Score
The Tree Equity Score was built by American Forest to assess how equitably trees were planted in a city. More information, checkout [https://treeequityscore.org](https://treeequityscore.org).

View file

@ -0,0 +1,87 @@
import geopandas as gpd
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__)
class TreeEquityScoreETL(ExtractTransformLoad):
def __init__(self):
self.TES_URL = (
"https://national-tes-data-share.s3.amazonaws.com/national_tes_share/"
)
self.TES_CSV = self.TMP_PATH / "tes_2021_data.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "tree_equity_score"
self.df: gpd.GeoDataFrame
self.states = [
"al",
"az",
"ar",
"ca",
"co",
"ct",
"de",
"dc",
"fl",
"ga",
"id",
"il",
"in",
"ia",
"ks",
"ky",
"la",
"me",
"md",
"ma",
"mi",
"mn",
"ms",
"mo",
"mt",
"ne",
"nv",
"nh",
"nj",
"nm",
"ny",
"nc",
"nd",
"oh",
"ok",
"or",
"pa",
"ri",
"sc",
"sd",
"tn",
"tx",
"ut",
"vt",
"va",
"wa",
"wv",
"wi",
"wy",
]
def extract(self) -> None:
logger.info("Downloading Tree Equity Score Data")
for state in self.states:
super().extract(
f"{self.TES_URL}{state}.zip.zip", f"{self.TMP_PATH}/{state}",
)
def transform(self) -> None:
logger.info("Transforming Tree Equity Score Data")
tes_state_dfs = []
for state in self.states:
tes_state_dfs.append(gpd.read_file(f"{self.TMP_PATH}/{state}/{state}.shp"))
self.df = gpd.GeoDataFrame(pd.concat(tes_state_dfs), crs=tes_state_dfs[0].crs)
def load(self) -> None:
logger.info("Saving Tree Equity Score GeoJSON")
# write nationwide csv
self.CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_file(self.CSV_PATH / "tes_conus.geojson", driver="GeoJSON")