Tile-baking columns with floating rounds completed (#491)

* Tile-baking columns with floating rounds completed

* completed

* correction on github workflow

* tiles folder no longer needed

* addressed comments

* updating requirements.txt

* poetry lock update

* adding xlswriter

* final poetrylock

* updated requirements.txt

* checkpoint

* removed matplotlib

* ignoring pylint too many statements

* reinstated too many statements

* converting data sync to generate score GHA UI-driven
This commit is contained in:
Jorge Escobar 2021-08-10 15:28:50 -04:00 committed by GitHub
commit 3d8dbb293c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 311 additions and 209 deletions

View file

@ -27,10 +27,10 @@ class ScoreETL(ExtractTransformLoad):
self.UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
self.HOUSING_BURDEN_FIELD_NAME = "Housing burden (percent)"
self.POVERTY_FIELD_NAME = "Poverty (Less than 200% of federal poverty line)"
self.HIGH_SCHOOL_FIELD_NAME = (
"Percent individuals age 25 or over with less than high school degree"
self.POVERTY_FIELD_NAME = (
"Poverty (Less than 200% of federal poverty line)"
)
self.HIGH_SCHOOL_FIELD_NAME = "Percent individuals age 25 or over with less than high school degree"
self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME = (
"Median household income (% of state median household income)"
)
@ -51,86 +51,14 @@ class ScoreETL(ExtractTransformLoad):
self.housing_and_transportation_df: pd.DataFrame
self.hud_housing_df: pd.DataFrame
def extract(self) -> None:
# EJSCreen csv Load
ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv"
self.ejscreen_df = pd.read_csv(
ejscreen_csv, dtype={"ID": "string"}, low_memory=False
)
self.ejscreen_df.rename(columns={"ID": self.GEOID_FIELD_NAME}, inplace=True)
# Load census data
census_csv = self.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
self.census_df = pd.read_csv(
census_csv,
dtype={self.GEOID_FIELD_NAME: "string"},
low_memory=False,
)
# Load housing and transportation data
housing_and_transportation_index_csv = (
self.DATA_PATH / "dataset" / "housing_and_transportation_index" / "usa.csv"
)
self.housing_and_transportation_df = pd.read_csv(
housing_and_transportation_index_csv,
dtype={self.GEOID_FIELD_NAME: "string"},
low_memory=False,
)
# Load HUD housing data
hud_housing_csv = self.DATA_PATH / "dataset" / "hud_housing" / "usa.csv"
self.hud_housing_df = pd.read_csv(
hud_housing_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
def transform(self) -> None:
logger.info("Transforming Score Data")
# Join all the data sources that use census block groups
census_block_group_dfs = [
self.ejscreen_df,
self.census_df,
self.housing_and_transportation_df,
]
census_block_group_df = functools.reduce(
lambda left, right: pd.merge(
left=left, right=right, on=self.GEOID_FIELD_NAME, how="outer"
),
census_block_group_dfs,
)
# Sanity check the join.
if len(census_block_group_df[self.GEOID_FIELD_NAME].str.len().unique()) != 1:
raise ValueError(
f"One of the input CSVs uses {self.GEOID_FIELD_NAME} with a different length."
)
# Join all the data sources that use census tracts
# TODO: when there's more than one data source using census tract, reduce/merge them here.
census_tract_df = self.hud_housing_df
# Calculate the tract for the CBG data.
census_block_group_df[self.GEOID_TRACT_FIELD_NAME] = census_block_group_df[
self.GEOID_FIELD_NAME
].str[0:11]
self.df = census_block_group_df.merge(
census_tract_df, on=self.GEOID_TRACT_FIELD_NAME
)
if len(census_block_group_df) > 220333:
raise ValueError("Too many rows in the join.")
def data_sets(self) -> list:
# Define a named tuple that will be used for each data set input.
DataSet = collections.namedtuple(
typename="DataSet",
field_names=["input_field", "renamed_field", "bucket"],
)
data_sets = [
return [
# The following data sets have `bucket=None`, because it's not used in the bucket based score ("Score C").
DataSet(
input_field=self.GEOID_FIELD_NAME,
@ -251,9 +179,94 @@ class ScoreETL(ExtractTransformLoad):
),
]
def extract(self) -> None:
# EJSCreen csv Load
ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv"
self.ejscreen_df = pd.read_csv(
ejscreen_csv, dtype={"ID": "string"}, low_memory=False
)
self.ejscreen_df.rename(
columns={"ID": self.GEOID_FIELD_NAME}, inplace=True
)
# Load census data
census_csv = self.DATA_PATH / "dataset" / "census_acs_2019" / "usa.csv"
self.census_df = pd.read_csv(
census_csv,
dtype={self.GEOID_FIELD_NAME: "string"},
low_memory=False,
)
# Load housing and transportation data
housing_and_transportation_index_csv = (
self.DATA_PATH
/ "dataset"
/ "housing_and_transportation_index"
/ "usa.csv"
)
self.housing_and_transportation_df = pd.read_csv(
housing_and_transportation_index_csv,
dtype={self.GEOID_FIELD_NAME: "string"},
low_memory=False,
)
# Load HUD housing data
hud_housing_csv = self.DATA_PATH / "dataset" / "hud_housing" / "usa.csv"
self.hud_housing_df = pd.read_csv(
hud_housing_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
def transform(self) -> None:
logger.info("Transforming Score Data")
# Join all the data sources that use census block groups
census_block_group_dfs = [
self.ejscreen_df,
self.census_df,
self.housing_and_transportation_df,
]
census_block_group_df = functools.reduce(
lambda left, right: pd.merge(
left=left, right=right, on=self.GEOID_FIELD_NAME, how="outer"
),
census_block_group_dfs,
)
# Sanity check the join.
if (
len(census_block_group_df[self.GEOID_FIELD_NAME].str.len().unique())
!= 1
):
raise ValueError(
f"One of the input CSVs uses {self.GEOID_FIELD_NAME} with a different length."
)
# Join all the data sources that use census tracts
# TODO: when there's more than one data source using census tract, reduce/merge them here.
census_tract_df = self.hud_housing_df
# Calculate the tract for the CBG data.
census_block_group_df[
self.GEOID_TRACT_FIELD_NAME
] = census_block_group_df[self.GEOID_FIELD_NAME].str[0:11]
self.df = census_block_group_df.merge(
census_tract_df, on=self.GEOID_TRACT_FIELD_NAME
)
if len(census_block_group_df) > 220333:
raise ValueError("Too many rows in the join.")
# get data sets list
data_sets = self.data_sets()
# Rename columns:
renaming_dict = {
data_set.input_field: data_set.renamed_field for data_set in data_sets
data_set.input_field: data_set.renamed_field
for data_set in data_sets
}
self.df.rename(
@ -307,13 +320,6 @@ class ScoreETL(ExtractTransformLoad):
self.df[data_set.renamed_field] - min_value
) / (max_value - min_value)
# Graph distributions and correlations.
min_max_fields = [ # noqa: F841
f"{data_set.renamed_field}{self.MIN_MAX_FIELD_SUFFIX}"
for data_set in data_sets
if data_set.renamed_field != self.GEOID_FIELD_NAME
]
# Calculate score "A" and score "B"
self.df["Score A"] = self.df[
[
@ -322,7 +328,9 @@ class ScoreETL(ExtractTransformLoad):
]
].mean(axis=1)
self.df["Score B"] = (
self.df["Poverty (Less than 200% of federal poverty line) (percentile)"]
self.df[
"Poverty (Less than 200% of federal poverty line) (percentile)"
]
* self.df[
"Percent individuals age 25 or over with less than high school degree (percentile)"
]
@ -357,7 +365,8 @@ class ScoreETL(ExtractTransformLoad):
# Multiply the "Pollution Burden" score and the "Population Characteristics"
# together to produce the cumulative impact score.
self.df["Score C"] = (
self.df[self.AGGREGATION_POLLUTION] * self.df[self.AGGREGATION_POPULATION]
self.df[self.AGGREGATION_POLLUTION]
* self.df[self.AGGREGATION_POPULATION]
)
if len(census_block_group_df) > 220333:
@ -372,10 +381,12 @@ class ScoreETL(ExtractTransformLoad):
]
fields_min_max = [
f"{field}{self.MIN_MAX_FIELD_SUFFIX}" for field in fields_to_use_in_score
f"{field}{self.MIN_MAX_FIELD_SUFFIX}"
for field in fields_to_use_in_score
]
fields_percentile = [
f"{field}{self.PERCENTILE_FIELD_SUFFIX}" for field in fields_to_use_in_score
f"{field}{self.PERCENTILE_FIELD_SUFFIX}"
for field in fields_to_use_in_score
]
# Calculate "Score D", which uses min-max normalization
@ -428,10 +439,10 @@ class ScoreETL(ExtractTransformLoad):
) | (self.df["Respiratory hazard " "index"] > 0.75)
self.df["Score F (communities)"] = (
self.df[ami_and_high_school_field_name] & self.df[meets_burden_field_name]
self.df[ami_and_high_school_field_name]
& self.df[meets_burden_field_name]
)
def load(self) -> None:
logger.info("Saving Score CSV")
@ -439,6 +450,8 @@ class ScoreETL(ExtractTransformLoad):
self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
# TODO: drop
self.df[0:10000].to_csv(self.SCORE_CSV_PATH / "usa-10000.csv", index=False)
self.df[0:10000].to_csv(
self.SCORE_CSV_PATH / "usa-10000.csv", index=False
)
self.df.to_csv(self.SCORE_CSV_PATH / "usa.csv", index=False)

View file

@ -41,16 +41,32 @@ class PostScoreETL(ExtractTransformLoad):
self.TILES_SCORE_COLUMNS = [
"GEOID10",
"State Name",
"County Name",
"Total population",
"Score E (percentile)",
"Score E (top 25th percentile)",
"GEOID",
"State Abbreviation",
"County Name",
"Poverty (Less than 200% of federal poverty line)",
"Percent individuals age 25 or over with less than high school degree",
"Linguistic isolation (percent)",
"Unemployed civilians (percent)",
"Housing burden (percent)",
]
self.TILES_SCORE_CSV_PATH = self.SCORE_CSV_PATH / "tiles"
self.TILES_SCORE_CSV = self.TILES_SCORE_CSV_PATH / "usa.csv"
# These are the
# columns to round floats to 2 decimals
self.TILES_SCORE_FLOAT_COLUMNS = [
"Score E (percentile)",
"Score E (top 25th percentile)",
"Poverty (Less than 200% of federal poverty line)",
"Percent individuals age 25 or over with less than high school degree",
"Linguistic isolation (percent)",
"Unemployed civilians (percent)",
"Housing burden (percent)",
]
self.TILES_ROUND_NUM_DECIMALS = 2
self.DOWNLOADABLE_SCORE_INDICATORS_BASIC = [
"Percent individuals age 25 or over with less than high school degree",
"Linguistic isolation (percent)",
@ -106,7 +122,10 @@ class PostScoreETL(ExtractTransformLoad):
self.counties_df = pd.read_csv(
self.CENSUS_COUNTIES_TXT,
sep="\t",
dtype={"GEOID": "string", "USPS": "string"},
dtype={
"GEOID": "string",
"USPS": "string",
},
low_memory=False,
encoding="latin-1",
)
@ -115,7 +134,10 @@ class PostScoreETL(ExtractTransformLoad):
self.states_df = pd.read_csv(
self.STATE_CSV, dtype={"fips": "string", "state_code": "string"}
)
self.score_df = pd.read_csv(self.FULL_SCORE_CSV, dtype={"GEOID10": "string"})
self.score_df = pd.read_csv(
self.FULL_SCORE_CSV,
dtype={"GEOID10": "string", "Total population": "int64"},
)
def transform(self) -> None:
logger.info("Transforming data sources for Score + County CSV")
@ -165,13 +187,22 @@ class PostScoreETL(ExtractTransformLoad):
# merge census cbgs with score
merged_df = cbg_usa_df.merge(
self.score_county_state_merged, on="GEOID10", how="left"
self.score_county_state_merged,
on="GEOID10",
how="left",
)
# recast population to integer
merged_df["Total population"] = (
merged_df["Total population"].fillna(0.0).astype(int)
)
# list the null score cbgs
null_cbg_df = merged_df[merged_df["Score E (percentile)"].isnull()]
# subsctract data sets
# this follows the XOR pattern outlined here:
# https://stackoverflow.com/a/37313953
removed_df = pd.concat([merged_df, null_cbg_df, null_cbg_df]).drop_duplicates(
keep=False
)
@ -188,9 +219,14 @@ class PostScoreETL(ExtractTransformLoad):
def _save_tile_csv(self):
logger.info("Saving Tile Score CSV")
# TODO: check which are the columns we'll use
# Related to: https://github.com/usds/justice40-tool/issues/302
score_tiles = self.score_county_state_merged[self.TILES_SCORE_COLUMNS]
decimals = pd.Series(
[self.TILES_ROUND_NUM_DECIMALS] * len(self.TILES_SCORE_FLOAT_COLUMNS),
index=self.TILES_SCORE_FLOAT_COLUMNS,
)
score_tiles = score_tiles.round(decimals)
self.TILES_SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
score_tiles.to_csv(self.TILES_SCORE_CSV, index=False)
@ -210,7 +246,10 @@ class PostScoreETL(ExtractTransformLoad):
downloadable_tiles.to_excel(self.DOWNLOADABLE_SCORE_EXCEL, index=False)
logger.info("Compressing files")
files_to_compress = [self.DOWNLOADABLE_SCORE_CSV, self.DOWNLOADABLE_SCORE_EXCEL]
files_to_compress = [
self.DOWNLOADABLE_SCORE_CSV,
self.DOWNLOADABLE_SCORE_EXCEL,
]
with zipfile.ZipFile(self.DOWNLOADABLE_SCORE_ZIP, "w") as zf:
for f in files_to_compress:
zf.write(f, arcname=Path(f).name, compress_type=compression)