County Names for Score #188 (#347)

* starting PR

* completed feature

* checkpoint

* adding new fips and updating counties to 2010

* updated sources to 2010 - 2019

* more cleanup

* creating tiles score csv
This commit is contained in:
Jorge Escobar 2021-07-15 13:34:08 -04:00 committed by GitHub
commit 0316906a69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 425 additions and 54 deletions

View file

@ -1,6 +1,7 @@
import importlib
from etl.score.etl import ScoreETL
from etl.score.etl_score import ScoreETL
from etl.score.etl_score_post import PostScoreETL
def etl_runner(dataset_to_run: str = None) -> None:
@ -20,7 +21,11 @@ def etl_runner(dataset_to_run: str = None) -> None:
"module_dir": "census_acs",
"class_name": "CensusACSETL",
},
{"name": "ejscreen", "module_dir": "ejscreen", "class_name": "EJScreenETL"},
{
"name": "ejscreen",
"module_dir": "ejscreen",
"class_name": "EJScreenETL",
},
{
"name": "housing_and_transportation",
"module_dir": "housing_and_transportation",
@ -36,12 +41,17 @@ def etl_runner(dataset_to_run: str = None) -> None:
"module_dir": "calenviroscreen",
"class_name": "CalEnviroScreenETL",
},
{"name": "hud_recap", "module_dir": "hud_recap", "class_name": "HudRecapETL"},
{
"name": "hud_recap",
"module_dir": "hud_recap",
"class_name": "HudRecapETL",
},
]
if dataset_to_run:
dataset_element = next(
(item for item in dataset_list if item["name"] == dataset_to_run), None
(item for item in dataset_list if item["name"] == dataset_to_run),
None,
)
if not dataset_list:
raise ValueError("Invalid dataset name")
@ -51,7 +61,9 @@ def etl_runner(dataset_to_run: str = None) -> None:
# Run the ETLs for the dataset_list
for dataset in dataset_list:
etl_module = importlib.import_module(f"etl.sources.{dataset['module_dir']}.etl")
etl_module = importlib.import_module(
f"etl.sources.{dataset['module_dir']}.etl"
)
etl_class = getattr(etl_module, dataset["class_name"])
etl_instance = etl_class()
@ -80,16 +92,19 @@ def score_generate() -> None:
Returns:
None
"""
score = ScoreETL()
# run extract
score.extract()
# Score Gen
score_gen = ScoreETL()
score_gen.extract()
score_gen.transform()
score_gen.load()
# run transform
score.transform()
# run load
score.load()
# Post Score Processing
score_post = PostScoreETL()
score_post.extract()
score_post.transform()
score_post.load()
score_post.cleanup()
def _find_dataset_index(dataset_list, key, value):

View file

@ -28,10 +28,10 @@ class ScoreETL(ExtractTransformLoad):
self.UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
self.HOUSING_BURDEN_FIELD_NAME = "Housing burden (percent)"
self.POVERTY_FIELD_NAME = "Poverty (Less than 200% of federal poverty line)"
self.HIGH_SCHOOL_FIELD_NAME = (
"Percent individuals age 25 or over with less than high school degree"
self.POVERTY_FIELD_NAME = (
"Poverty (Less than 200% of federal poverty line)"
)
self.HIGH_SCHOOL_FIELD_NAME = "Percent individuals age 25 or over with less than high school degree"
# There's another aggregation level (a second level of "buckets").
self.AGGREGATION_POLLUTION = "Pollution Burden"
@ -40,7 +40,7 @@ class ScoreETL(ExtractTransformLoad):
self.PERCENTILE_FIELD_SUFFIX = " (percentile)"
self.MIN_MAX_FIELD_SUFFIX = " (min-max normalized)"
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv" / "full"
# dataframes
self.df: pd.DataFrame
@ -51,21 +51,28 @@ class ScoreETL(ExtractTransformLoad):
def extract(self) -> None:
# EJSCreen csv Load
ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2020" / "usa.csv"
ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv"
self.ejscreen_df = pd.read_csv(
ejscreen_csv, dtype={"ID": "string"}, low_memory=False
)
self.ejscreen_df.rename(columns={"ID": self.GEOID_FIELD_NAME}, inplace=True)
self.ejscreen_df.rename(
columns={"ID": self.GEOID_FIELD_NAME}, inplace=True
)
# Load census data
census_csv = self.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
self.census_df = pd.read_csv(
census_csv, dtype={self.GEOID_FIELD_NAME: "string"}, low_memory=False
census_csv,
dtype={self.GEOID_FIELD_NAME: "string"},
low_memory=False,
)
# Load housing and transportation data
housing_and_transportation_index_csv = (
self.DATA_PATH / "dataset" / "housing_and_transportation_index" / "usa.csv"
self.DATA_PATH
/ "dataset"
/ "housing_and_transportation_index"
/ "usa.csv"
)
self.housing_and_transportation_df = pd.read_csv(
housing_and_transportation_index_csv,
@ -99,7 +106,10 @@ class ScoreETL(ExtractTransformLoad):
)
# Sanity check the join.
if len(census_block_group_df[self.GEOID_FIELD_NAME].str.len().unique()) != 1:
if (
len(census_block_group_df[self.GEOID_FIELD_NAME].str.len().unique())
!= 1
):
raise ValueError(
f"One of the input CSVs uses {self.GEOID_FIELD_NAME} with a different length."
)
@ -109,9 +119,9 @@ class ScoreETL(ExtractTransformLoad):
census_tract_df = self.hud_housing_df
# Calculate the tract for the CBG data.
census_block_group_df[self.GEOID_TRACT_FIELD_NAME] = census_block_group_df[
self.GEOID_FIELD_NAME
].str[0:11]
census_block_group_df[
self.GEOID_TRACT_FIELD_NAME
] = census_block_group_df[self.GEOID_FIELD_NAME].str[0:11]
self.df = census_block_group_df.merge(
census_tract_df, on=self.GEOID_TRACT_FIELD_NAME
@ -122,7 +132,8 @@ class ScoreETL(ExtractTransformLoad):
# Define a named tuple that will be used for each data set input.
DataSet = collections.namedtuple(
typename="DataSet", field_names=["input_field", "renamed_field", "bucket"]
typename="DataSet",
field_names=["input_field", "renamed_field", "bucket"],
)
data_sets = [
@ -139,7 +150,9 @@ class ScoreETL(ExtractTransformLoad):
bucket=None,
),
DataSet(
input_field="ACSTOTPOP", renamed_field="Total population", bucket=None
input_field="ACSTOTPOP",
renamed_field="Total population",
bucket=None,
),
# The following data sets have buckets, because they're used in the score
DataSet(
@ -163,7 +176,9 @@ class ScoreETL(ExtractTransformLoad):
bucket=self.BUCKET_EXPOSURES,
),
DataSet(
input_field="OZONE", renamed_field="Ozone", bucket=self.BUCKET_EXPOSURES
input_field="OZONE",
renamed_field="Ozone",
bucket=self.BUCKET_EXPOSURES,
),
DataSet(
input_field="PTRAF",
@ -239,7 +254,8 @@ class ScoreETL(ExtractTransformLoad):
# Rename columns:
renaming_dict = {
data_set.input_field: data_set.renamed_field for data_set in data_sets
data_set.input_field: data_set.renamed_field
for data_set in data_sets
}
self.df.rename(
@ -308,7 +324,9 @@ class ScoreETL(ExtractTransformLoad):
]
].mean(axis=1)
self.df["Score B"] = (
self.df["Poverty (Less than 200% of federal poverty line) (percentile)"]
self.df[
"Poverty (Less than 200% of federal poverty line) (percentile)"
]
* self.df[
"Percent individuals age 25 or over with less than high school degree (percentile)"
]
@ -337,7 +355,8 @@ class ScoreETL(ExtractTransformLoad):
# Multiply the "Pollution Burden" score and the "Population Characteristics" together to produce the cumulative impact score.
self.df["Score C"] = (
self.df[self.AGGREGATION_POLLUTION] * self.df[self.AGGREGATION_POPULATION]
self.df[self.AGGREGATION_POLLUTION]
* self.df[self.AGGREGATION_POPULATION]
)
if len(census_block_group_df) > 220333:
@ -352,10 +371,12 @@ class ScoreETL(ExtractTransformLoad):
]
fields_min_max = [
f"{field}{self.MIN_MAX_FIELD_SUFFIX}" for field in fields_to_use_in_score
f"{field}{self.MIN_MAX_FIELD_SUFFIX}"
for field in fields_to_use_in_score
]
fields_percentile = [
f"{field}{self.PERCENTILE_FIELD_SUFFIX}" for field in fields_to_use_in_score
f"{field}{self.PERCENTILE_FIELD_SUFFIX}"
for field in fields_to_use_in_score
]
# Calculate "Score D", which uses min-max normalization
@ -367,7 +388,13 @@ class ScoreETL(ExtractTransformLoad):
self.df[fields_min_max].corr()
# Create percentiles for the scores
for score_field in ["Score A", "Score B", "Score C", "Score D", "Score E"]:
for score_field in [
"Score A",
"Score B",
"Score C",
"Score D",
"Score E",
]:
self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = self.df[
score_field
].rank(pct=True)
@ -376,14 +403,8 @@ class ScoreETL(ExtractTransformLoad):
)
def load(self) -> None:
logger.info(f"Saving Score CSVs")
logger.info(f"Saving Score CSV")
# write nationwide csv
self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.SCORE_CSV_PATH / f"usa.csv", index=False)
# write per state csvs
for states_fips in get_state_fips_codes(self.DATA_PATH):
logger.info(f"Generating data{states_fips} csv")
df1 = self.df[self.df["GEOID10"].str[:2] == states_fips]
# we need to name the file data01.csv for ogr2ogr csv merge to work
df1.to_csv(self.SCORE_CSV_PATH / f"data{states_fips}.csv", index=False)

View file

@ -0,0 +1,112 @@
import pandas as pd
from etl.base import ExtractTransformLoad
from utils import get_module_logger
logger = get_module_logger(__name__)
class PostScoreETL(ExtractTransformLoad):
"""
A class used to instantiate an ETL object to retrieve and process data from
datasets.
"""
def __init__(self):
self.CENSUS_COUNTIES_ZIP_URL = "https://www2.census.gov/geo/docs/maps-data/data/gazetteer/Gaz_counties_national.zip"
self.CENSUS_COUNTIES_TXT = self.TMP_PATH / "Gaz_counties_national.txt"
self.CENSUS_COUNTIES_COLS = ["USPS", "GEOID", "NAME"]
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
self.STATE_CSV = (
self.DATA_PATH / "census" / "csv" / "fips_states_2010.csv"
)
self.SCORE_CSV = self.SCORE_CSV_PATH / "full" / "usa.csv"
self.COUNTY_SCORE_CSV = self.SCORE_CSV_PATH / "full" / "usa-county.csv"
self.TILES_SCORE_COLUMNS = [
"GEOID10",
"Score E (percentile)",
"Score E (top 25th percentile)",
"GEOID",
"State Abbreviation",
"County Name",
]
self.TILES_SCORE_CSV_PATH = self.SCORE_CSV_PATH / "tiles"
self.TILES_SCORE_CSV = self.TILES_SCORE_CSV_PATH / "usa.csv"
self.counties_df: pd.DataFrame
self.states_df: pd.DataFrame
self.score_df: pd.DataFrame
self.score_county_state_merged: pd.DataFrame
self.score_for_tiles: pd.DataFrame
def extract(self) -> None:
super().extract(
self.CENSUS_COUNTIES_ZIP_URL,
self.TMP_PATH,
)
logger.info(f"Reading Counties CSV")
self.counties_df = pd.read_csv(
self.CENSUS_COUNTIES_TXT,
sep="\t",
dtype={"GEOID": "string", "USPS": "string"},
low_memory=False,
encoding="latin-1",
)
logger.info(f"Reading States CSV")
self.states_df = pd.read_csv(
self.STATE_CSV, dtype={"fips": "string", "state_code": "string"}
)
self.score_df = pd.read_csv(self.SCORE_CSV, dtype={"GEOID10": "string"})
def transform(self) -> None:
logger.info(f"Transforming data sources for Score + County CSV")
# rename some of the columns to prepare for merge
self.counties_df = self.counties_df[["USPS", "GEOID", "NAME"]]
self.counties_df.rename(
columns={"USPS": "State Abbreviation", "NAME": "County Name"},
inplace=True,
)
# remove unnecessary columns
self.states_df.rename(
columns={
"fips": "State Code",
"state_name": "State Name",
"state_abbreviation": "State Abbreviation",
},
inplace=True,
)
self.states_df.drop(["region", "division"], axis=1, inplace=True)
# add the tract level column
self.score_df["GEOID"] = self.score_df.GEOID10.str[:5]
# merge state and counties
county_state_merged = self.counties_df.join(
self.states_df, rsuffix=" Other"
)
del county_state_merged["State Abbreviation Other"]
# merge county and score
self.score_county_state_merged = self.score_df.join(
county_state_merged, rsuffix="_OTHER"
)
del self.score_county_state_merged["GEOID_OTHER"]
def load(self) -> None:
logger.info(f"Saving Score + County CSV")
self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
# self.score_county_state_merged.to_csv(
# self.COUNTY_SCORE_CSV, index=False
# )
logger.info(f"Saving Tile Score CSV")
# TODO: check which are the columns we'll use
# Related to: https://github.com/usds/justice40-tool/issues/302
score_tiles = self.score_county_state_merged[self.TILES_SCORE_COLUMNS]
self.TILES_SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
score_tiles.to_csv(self.TILES_SCORE_CSV, index=False)

View file

@ -11,10 +11,14 @@ logger = get_module_logger(__name__)
class CensusACSETL(ExtractTransformLoad):
def __init__(self):
self.ACS_YEAR = 2019
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / f"census_acs_{self.ACS_YEAR}"
self.OUTPUT_PATH = (
self.DATA_PATH / "dataset" / f"census_acs_{self.ACS_YEAR}"
)
self.UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME = "Linguistic isolation (total)"
self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME = (
"Linguistic isolation (total)"
)
self.LINGUISTIC_ISOLATION_FIELDS = [
"C16002_001E",
"C16002_004E",
@ -24,7 +28,9 @@ class CensusACSETL(ExtractTransformLoad):
]
self.df: pd.DataFrame
def _fips_from_censusdata_censusgeo(self, censusgeo: censusdata.censusgeo) -> str:
def _fips_from_censusdata_censusgeo(
self, censusgeo: censusdata.censusgeo
) -> str:
"""Create a FIPS code from the proprietary censusgeo index."""
fips = "".join([value for (key, value) in censusgeo.params()])
return fips
@ -32,7 +38,9 @@ class CensusACSETL(ExtractTransformLoad):
def extract(self) -> None:
dfs = []
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(f"Downloading data for state/territory with FIPS code {fips}")
logger.info(
f"Downloading data for state/territory with FIPS code {fips}"
)
dfs.append(
censusdata.download(
@ -61,7 +69,9 @@ class CensusACSETL(ExtractTransformLoad):
# Calculate percent unemployment.
# TODO: remove small-sample data that should be `None` instead of a high-variance fraction.
self.df[self.UNEMPLOYED_FIELD_NAME] = self.df.B23025_005E / self.df.B23025_003E
self.df[self.UNEMPLOYED_FIELD_NAME] = (
self.df.B23025_005E / self.df.B23025_003E
)
# Calculate linguistic isolation.
individual_limited_english_fields = [

View file

@ -8,11 +8,9 @@ logger = get_module_logger(__name__)
class EJScreenETL(ExtractTransformLoad):
def __init__(self):
self.EJSCREEN_FTP_URL = (
"https://gaftp.epa.gov/EJSCREEN/2020/EJSCREEN_2020_StatePctile.csv.zip"
)
self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_2020_StatePctile.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2020"
self.EJSCREEN_FTP_URL = "https://gaftp.epa.gov/EJSCREEN/2019/EJSCREEN_2019_StatePctile.csv.zip"
self.EJSCREEN_CSV = self.TMP_PATH / "EJSCREEN_2019_StatePctile.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "ejscreen_2019"
self.df: pd.DataFrame
def extract(self) -> None: