Issue 675 & 676: Adding life expectancy and DOE energy burden data (#683)

* Adding two new data sources.
This commit is contained in:
Lucas Merrill Brown 2021-09-15 09:59:28 -05:00 committed by GitHub
parent fc5ed37fca
commit e94d05882c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 240 additions and 26 deletions

View file

@ -49,6 +49,16 @@ DATASET_LIST = [
"module_dir": "census_acs_median_income", "module_dir": "census_acs_median_income",
"class_name": "CensusACSMedianIncomeETL", "class_name": "CensusACSMedianIncomeETL",
}, },
{
"name": "cdc_life_expectancy",
"module_dir": "cdc_life_expectancy",
"class_name": "CDCLifeExpectancy",
},
{
"name": "doe_energy_burden",
"module_dir": "doe_energy_burden",
"class_name": "DOEEnergyBurden",
},
] ]
CENSUS_INFO = { CENSUS_INFO = {
"name": "census", "name": "census",

View file

@ -26,7 +26,9 @@ class ScoreETL(ExtractTransformLoad):
# A few specific field names # A few specific field names
# TODO: clean this up, I name some fields but not others. # TODO: clean this up, I name some fields but not others.
self.UNEMPLOYED_FIELD_NAME: str = "Unemployed civilians (percent)" self.UNEMPLOYED_FIELD_NAME: str = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME: str = "Linguistic isolation (percent)" self.LINGUISTIC_ISOLATION_FIELD_NAME: str = (
"Linguistic isolation (percent)"
)
self.HOUSING_BURDEN_FIELD_NAME: str = "Housing burden (percent)" self.HOUSING_BURDEN_FIELD_NAME: str = "Housing burden (percent)"
self.POVERTY_FIELD_NAME: str = ( self.POVERTY_FIELD_NAME: str = (
"Poverty (Less than 200% of federal poverty line)" "Poverty (Less than 200% of federal poverty line)"
@ -58,6 +60,12 @@ class ScoreETL(ExtractTransformLoad):
"Percent of individuals < 200% Federal Poverty Line" "Percent of individuals < 200% Federal Poverty Line"
) )
# CDC life expectancy
self.LIFE_EXPECTANCY_FIELD_NAME = "Life expectancy (years)"
# DOE energy burden
self.ENERGY_BURDEN_FIELD_NAME = "Energy burden"
# There's another aggregation level (a second level of "buckets"). # There's another aggregation level (a second level of "buckets").
self.AGGREGATION_POLLUTION: str = "Pollution Burden" self.AGGREGATION_POLLUTION: str = "Pollution Burden"
self.AGGREGATION_POPULATION: str = "Population Characteristics" self.AGGREGATION_POPULATION: str = "Population Characteristics"
@ -75,6 +83,8 @@ class ScoreETL(ExtractTransformLoad):
self.hud_housing_df: pd.DataFrame self.hud_housing_df: pd.DataFrame
self.cdc_places_df: pd.DataFrame self.cdc_places_df: pd.DataFrame
self.census_acs_median_incomes_df: pd.DataFrame self.census_acs_median_incomes_df: pd.DataFrame
self.cdc_life_expectancy_df: pd.DataFrame
self.doe_energy_burden_df: pd.DataFrame
def data_sets(self) -> list: def data_sets(self) -> list:
# Define a named tuple that will be used for each data set input. # Define a named tuple that will be used for each data set input.
@ -166,6 +176,16 @@ class ScoreETL(ExtractTransformLoad):
renamed_field=self.MEDIAN_INCOME_FIELD_NAME, renamed_field=self.MEDIAN_INCOME_FIELD_NAME,
bucket=None, bucket=None,
), ),
DataSet(
input_field=self.LIFE_EXPECTANCY_FIELD_NAME,
renamed_field=self.LIFE_EXPECTANCY_FIELD_NAME,
bucket=None,
),
DataSet(
input_field=self.ENERGY_BURDEN_FIELD_NAME,
renamed_field=self.ENERGY_BURDEN_FIELD_NAME,
bucket=None,
),
# The following data sets have buckets, because they're used in Score C # The following data sets have buckets, because they're used in Score C
DataSet( DataSet(
input_field="CANCER", input_field="CANCER",
@ -325,6 +345,26 @@ class ScoreETL(ExtractTransformLoad):
low_memory=False, low_memory=False,
) )
# Load CDC life expectancy data
cdc_life_expectancy_csv = (
self.DATA_PATH / "dataset" / "cdc_life_expectancy" / "usa.csv"
)
self.cdc_life_expectancy_df = pd.read_csv(
cdc_life_expectancy_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
# Load DOE energy burden data
doe_energy_burden_csv = (
self.DATA_PATH / "dataset" / "doe_energy_burden" / "usa.csv"
)
self.doe_energy_burden_df = pd.read_csv(
doe_energy_burden_csv,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)
def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame: def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame:
logger.info("Joining Census Block Group dataframes") logger.info("Joining Census Block Group dataframes")
census_block_group_df = functools.reduce( census_block_group_df = functools.reduce(
@ -566,21 +606,9 @@ class ScoreETL(ExtractTransformLoad):
) )
df["Score H"] = df["Score H (communities)"].astype(int) df["Score H"] = df["Score H (communities)"].astype(int)
# df["80% AMI & 6% high school (communities)"] = (
# (df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8)
# & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold_2)
# )
#
# df["FPL200>40% & 6% high school (communities)"] = (
# (df[self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME] > 0.40)
# & (df[self.HIGH_SCHOOL_FIELD_NAME] > high_school_cutoff_threshold_2)
# )
df["NMTC (communities)"] = ( df["NMTC (communities)"] = (
(df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8) (df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8)
) | ( ) | (df[self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME] > 0.20)
(df[self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME] > 0.20)
)
df["NMTC modified (communities)"] = ( df["NMTC modified (communities)"] = (
(df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8) (df[self.MEDIAN_INCOME_AS_PERCENT_OF_AMI_FIELD_NAME] < 0.8)
@ -609,6 +637,8 @@ class ScoreETL(ExtractTransformLoad):
census_tract_dfs = [ census_tract_dfs = [
self.hud_housing_df, self.hud_housing_df,
self.cdc_places_df, self.cdc_places_df,
self.cdc_life_expectancy_df,
self.doe_energy_burden_df
] ]
census_tract_df = self._join_tract_dfs(census_tract_dfs) census_tract_df = self._join_tract_dfs(census_tract_dfs)

View file

@ -0,0 +1,69 @@
from pathlib import Path
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger, download_file_from_url
logger = get_module_logger(__name__)
class CDCLifeExpectancy(ExtractTransformLoad):
def __init__(self):
self.FILE_URL: str = "https://ftp.cdc.gov/pub/Health_Statistics/NCHS/Datasets/NVSS/USALEEP/CSV/US_A.CSV"
self.OUTPUT_PATH: Path = (
self.DATA_PATH / "dataset" / "cdc_life_expectancy"
)
self.TRACT_INPUT_COLUMN_NAME = "Tract ID"
self.LIFE_EXPECTANCY_FIELD_NAME = "Life expectancy (years)"
# Constants for output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.LIFE_EXPECTANCY_FIELD_NAME,
]
self.raw_df: pd.DataFrame
self.output_df: pd.DataFrame
def extract(self) -> None:
logger.info("Starting data download.")
download_file_name = self.TMP_PATH / "cdc_life_expectancy" / "usa.csv"
download_file_from_url(
file_url=self.FILE_URL,
download_file_name=download_file_name,
verify=True,
)
self.raw_df = pd.read_csv(
filepath_or_buffer=download_file_name,
dtype={
# The following need to remain as strings for all of their digits, not get converted to numbers.
self.TRACT_INPUT_COLUMN_NAME: "string",
},
low_memory=False,
)
def transform(self) -> None:
logger.info("Starting DOE energy burden transform.")
self.output_df = self.raw_df.rename(
columns={
"e(0)": self.LIFE_EXPECTANCY_FIELD_NAME,
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
}
)
def validate(self) -> None:
logger.info("Validating CDC Life Expectancy Data")
pass
def load(self) -> None:
logger.info("Saving CDC Life Expectancy CSV")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)

View file

@ -100,7 +100,9 @@ class CensusACSETL(ExtractTransformLoad):
] ]
# Handle null values for CBG median income, which are `-666666666`. # Handle null values for CBG median income, which are `-666666666`.
missing_value_count = sum(self.df[self.MEDIAN_INCOME_FIELD_NAME]==-666666666) missing_value_count = sum(
self.df[self.MEDIAN_INCOME_FIELD_NAME] == -666666666
)
logger.info( logger.info(
f"There are {missing_value_count} ({int(100*missing_value_count/self.df[self.MEDIAN_INCOME_FIELD_NAME].count())}%) values of " f"There are {missing_value_count} ({int(100*missing_value_count/self.df[self.MEDIAN_INCOME_FIELD_NAME].count())}%) values of "
+ f"`{self.MEDIAN_INCOME_FIELD_NAME}` being marked as null values." + f"`{self.MEDIAN_INCOME_FIELD_NAME}` being marked as null values."

View file

@ -0,0 +1,86 @@
from pathlib import Path
import pandas as pd
from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger, unzip_file_from_url
logger = get_module_logger(__name__)
class DOEEnergyBurden(ExtractTransformLoad):
def __init__(self):
self.DOE_FILE_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/DOE_LEAD_with_EJSCREEN.csv.zip"
)
self.OUTPUT_PATH: Path = (
self.DATA_PATH / "dataset" / "doe_energy_burden"
)
self.TRACT_INPUT_COLUMN_NAME = "GEOID"
self.ENERGY_BURDEN_FIELD_NAME = "Energy burden"
# Constants for output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.ENERGY_BURDEN_FIELD_NAME,
]
self.raw_df: pd.DataFrame
self.output_df: pd.DataFrame
def extract(self) -> None:
logger.info("Starting data download.")
unzip_file_from_url(
file_url=self.DOE_FILE_URL,
download_path=self.TMP_PATH,
unzipped_file_path=self.TMP_PATH / "doe_energy_burden",
)
self.raw_df = pd.read_csv(
filepath_or_buffer=self.TMP_PATH
/ "doe_energy_burden"
/ "DOE_LEAD_with_EJSCREEN.csv",
# The following need to remain as strings for all of their digits, not get converted to numbers.
dtype={
self.TRACT_INPUT_COLUMN_NAME: "string",
},
low_memory=False,
)
def transform(self) -> None:
logger.info("Starting transforms.")
output_df = self.raw_df.rename(
columns={
"AvgEnergyBurden": self.ENERGY_BURDEN_FIELD_NAME,
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
}
)
# Convert energy burden to a fraction, since we represent all other percentages as fractions.
output_df[self.ENERGY_BURDEN_FIELD_NAME] = output_df[self.ENERGY_BURDEN_FIELD_NAME] / 100
# Left-pad the tracts with 0s
expected_length_of_census_tract_field = 11
output_df[self.GEOID_TRACT_FIELD_NAME] = output_df[self.GEOID_TRACT_FIELD_NAME].astype(str).apply(
lambda x: x.zfill(expected_length_of_census_tract_field)
)
self.output_df = output_df
def validate(self) -> None:
logger.info("Validating DOE Energy Burden Data")
pass
def load(self) -> None:
logger.info("Saving DOE Energy Burden CSV")
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)

View file

@ -111,15 +111,29 @@
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "d9968187", "id": "d9968187",
"metadata": {}, "metadata": {
"scrolled": false
},
"outputs": [], "outputs": [],
"source": [ "source": [
"# Analyze one field at a time (useful for setting thresholds)\n", "# Analyze one field at a time (useful for setting thresholds)\n",
"field = \"Percent of individuals < 200% Federal Poverty Line\"\n", "\n",
"quantile = 0.8\n",
"\n",
"for field in [\n",
" \"Percent of individuals < 200% Federal Poverty Line\",\n",
" \"Life expectancy (years)\",\n",
" \"Energy burden\",\n",
"]:\n",
" print(f\"\\n~~~~Analysis for field `{field}`~~~~\")\n",
" print(cejst_df[field].describe())\n", " print(cejst_df[field].describe())\n",
"quantile = .8\n", " print(\n",
"print(f\"Quantile at {quantile} is {np.nanquantile(a=cejst_df[field], q=quantile)}\")\n", " f\"\\nThere are {cejst_df[field].isnull().sum() * 100 / len(cejst_df):.2f}% of values missing.\"\n",
"cejst_df[field].hist()" " )\n",
" print(\n",
" f\"\\nQuantile at {quantile} is {np.nanquantile(a=cejst_df[field], q=quantile)}\"\n",
" )\n",
" print(cejst_df[field].hist())"
] ]
}, },
{ {
@ -242,7 +256,7 @@
" \"priority_communities_field\",\n", " \"priority_communities_field\",\n",
" # Note: this field only used by indices defined at the census tract level.\n", " # Note: this field only used by indices defined at the census tract level.\n",
" \"other_census_tract_fields_to_keep\",\n", " \"other_census_tract_fields_to_keep\",\n",
" ]\n", " ],\n",
")\n", ")\n",
"\n", "\n",
"# Define the indices used for CEJST scoring (`census_block_group_indices`) as well as comparison\n", "# Define the indices used for CEJST scoring (`census_block_group_indices`) as well as comparison\n",
@ -287,11 +301,12 @@
" method_name=\"Score D (25th percentile)\",\n", " method_name=\"Score D (25th percentile)\",\n",
" priority_communities_field=\"Score D (top 25th percentile)\",\n", " priority_communities_field=\"Score D (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n", " other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n", " Index(\n",
" method_name=\"Poverty\",\n", " method_name=\"Poverty\",\n",
" priority_communities_field=\"Poverty (Less than 200% of federal poverty line) (top 25th percentile)\",\n", " priority_communities_field=\"Poverty (Less than 200% of federal poverty line) (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n", " other_census_tract_fields_to_keep=[],\n",
" )\n", " ),\n",
"]\n", "]\n",
"\n", "\n",
"census_tract_indices = [\n", "census_tract_indices = [\n",
@ -638,7 +653,9 @@
" worksheet.set_column(f\"{column_character}:{column_character}\", column_width)\n", " worksheet.set_column(f\"{column_character}:{column_character}\", column_width)\n",
"\n", "\n",
" # Add green to red conditional formatting.\n", " # Add green to red conditional formatting.\n",
" column_ranges = f\"{column_character}2:{column_character}{len(cbg_score_comparison_df)+1}\"\n", " column_ranges = (\n",
" f\"{column_character}2:{column_character}{len(cbg_score_comparison_df)+1}\"\n",
" )\n",
" worksheet.conditional_format(\n", " worksheet.conditional_format(\n",
" column_ranges,\n", " column_ranges,\n",
" # Min: green, max: red.\n", " # Min: green, max: red.\n",