Analysis by region (#385)

* Adding regional comparisons

* Small ETL fixes
This commit is contained in:
Lucas Merrill Brown 2021-07-26 08:02:25 -07:00 committed by GitHub
parent 81290ce672
commit 67b39475f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 440 additions and 158 deletions

View file

@ -394,12 +394,14 @@ class ScoreETL(ExtractTransformLoad):
"Score C", "Score C",
"Score D", "Score D",
"Score E", "Score E",
"Poverty (Less than 200% of federal poverty line)",
]: ]:
self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = self.df[ self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = self.df[score_field].rank(pct=True)
score_field
].rank(pct=True) for threshold in [0.25, 0.3, 0.35, 0.4]:
self.df[f"{score_field} (top 25th percentile)"] = ( fraction_converted_to_percent = int(100 * threshold)
self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] >= 0.75 self.df[f"{score_field} (top {fraction_converted_to_percent}th percentile)"] = (
self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] >= 1 - threshold
) )
def load(self) -> None: def load(self) -> None:

View file

@ -8,7 +8,7 @@ logger = get_module_logger(__name__)
class CalEnviroScreenETL(ExtractTransformLoad): class CalEnviroScreenETL(ExtractTransformLoad):
def __init__(self): def __init__(self):
self.CALENVIROSCREEN_FTP_URL = "https://justice40-data.s3.amazonaws.com/CalEnviroScreen/CalEnviroScreen_4.0_2021.zip" self.CALENVIROSCREEN_FTP_URL = "https://justice40-data.s3.amazonaws.com/data-sources/CalEnviroScreen_4.0_2021.zip"
self.CALENVIROSCREEN_CSV = self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv" self.CALENVIROSCREEN_CSV = self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv"
self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4" self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4"

View file

@ -1,5 +1,6 @@
from pathlib import Path from pathlib import Path
import csv import csv
import pandas as pd
import os import os
from config import settings from config import settings
@ -53,3 +54,18 @@ def get_state_fips_codes(data_path: Path) -> list:
fips = row[0].strip() fips = row[0].strip()
fips_state_list.append(fips) fips_state_list.append(fips)
return fips_state_list return fips_state_list
def get_state_information(data_path: Path) -> pd.DataFrame:
"""Load the full state file as a dataframe.
Useful because of the state regional information.
"""
fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv"
df = pd.read_csv(fips_csv_path)
# Left pad the FIPS codes with 0s
df["fips"] = df["fips"].astype(str).apply(lambda x: x.zfill(2))
return df

View file

@ -44,8 +44,6 @@ class HousingTransportationETL(ExtractTransformLoad):
self.df = pd.concat(dfs) self.df = pd.concat(dfs)
self.df.head()
def transform(self) -> None: def transform(self) -> None:
logger.info(f"Transforming Housing and Transportation Data") logger.info(f"Transforming Housing and Transportation Data")

View file

@ -33,7 +33,7 @@ class HudRecapETL(ExtractTransformLoad):
logger.info(f"Transforming HUD Recap Data") logger.info(f"Transforming HUD Recap Data")
# Load comparison index (CalEnviroScreen 4) # Load comparison index (CalEnviroScreen 4)
self.df = pd.read_csv(self.HUD_RECAP_CSV, dtype={"Census Tract": "string"}) self.df = pd.read_csv(self.HUD_RECAP_CSV, dtype={"GEOID": "string"})
self.df.rename( self.df.rename(
columns={ columns={

View file

@ -3,7 +3,7 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "54615cef", "id": "93c7b73b",
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
@ -17,6 +17,7 @@
"import collections\n", "import collections\n",
"import functools\n", "import functools\n",
"import IPython\n", "import IPython\n",
"import itertools\n",
"import numpy as np\n", "import numpy as np\n",
"import os\n", "import os\n",
"import pandas as pd\n", "import pandas as pd\n",
@ -37,6 +38,8 @@
" sys.path.append(module_path)\n", " sys.path.append(module_path)\n",
"\n", "\n",
"from utils import remove_all_from_dir, get_excel_column_name\n", "from utils import remove_all_from_dir, get_excel_column_name\n",
"from etl.sources.census.etl_utils import get_state_information\n",
"\n",
"\n", "\n",
"# Turn on TQDM for pandas so that we can have progress bars when running `apply`.\n", "# Turn on TQDM for pandas so that we can have progress bars when running `apply`.\n",
"tqdm_notebook.pandas()" "tqdm_notebook.pandas()"
@ -45,7 +48,7 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "49a63129", "id": "881424fd",
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
@ -56,8 +59,8 @@
"\n", "\n",
"# Set some global parameters\n", "# Set some global parameters\n",
"DATA_DIR = pathlib.Path.cwd().parent / \"data\"\n", "DATA_DIR = pathlib.Path.cwd().parent / \"data\"\n",
"TEMP_DATA_DIR = pathlib.Path.cwd().parent / \"data\" / \"tmp\"\n", "TEMP_DATA_DIR = DATA_DIR / \"tmp\"\n",
"COMPARISON_OUTPUTS_DIR = TEMP_DATA_DIR / \"comparison_outputs\"\n", "COMPARISON_OUTPUTS_DIR = DATA_DIR / \"comparison_outputs\"\n",
"\n", "\n",
"# Make the dirs if they don't exist\n", "# Make the dirs if they don't exist\n",
"TEMP_DATA_DIR.mkdir(parents=True, exist_ok=True)\n", "TEMP_DATA_DIR.mkdir(parents=True, exist_ok=True)\n",
@ -71,6 +74,7 @@
"GEOID_FIELD_NAME = \"GEOID10\"\n", "GEOID_FIELD_NAME = \"GEOID10\"\n",
"GEOID_TRACT_FIELD_NAME = \"GEOID10_TRACT\"\n", "GEOID_TRACT_FIELD_NAME = \"GEOID10_TRACT\"\n",
"GEOID_STATE_FIELD_NAME = \"GEOID10_STATE\"\n", "GEOID_STATE_FIELD_NAME = \"GEOID10_STATE\"\n",
"COUNTRY_FIELD_NAME = \"Country\"\n",
"CENSUS_BLOCK_GROUP_POPULATION_FIELD = \"Total population\"\n", "CENSUS_BLOCK_GROUP_POPULATION_FIELD = \"Total population\"\n",
"\n", "\n",
"CEJST_SCORE_FIELD = \"cejst_score\"\n", "CEJST_SCORE_FIELD = \"cejst_score\"\n",
@ -84,29 +88,16 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "2b26dccf", "id": "c5f3eaa5",
"metadata": { "metadata": {
"scrolled": true "scrolled": false
}, },
"outputs": [], "outputs": [],
"source": [ "source": [
"# Load CEJST score data\n", "# Load CEJST score data\n",
"cejst_data_path = DATA_DIR / \"score\" / \"csv\" / \"usa.csv\"\n", "cejst_data_path = DATA_DIR / \"score\" / \"csv\" / \"full\" / \"usa.csv\"\n",
"cejst_df = pd.read_csv(cejst_data_path, dtype={GEOID_FIELD_NAME: \"string\"})\n", "cejst_df = pd.read_csv(cejst_data_path, dtype={GEOID_FIELD_NAME: \"string\"})\n",
"\n", "\n",
"# score_used = \"Score A\"\n",
"\n",
"# # Rename unclear name \"id\" to \"census_block_group_id\", as well as other renamings.\n",
"# cejst_df.rename(\n",
"# columns={\n",
"# \"Total population\": CENSUS_BLOCK_GROUP_POPULATION_FIELD,\n",
"# score_used: CEJST_SCORE_FIELD,\n",
"# f\"{score_used} (percentile)\": CEJST_PERCENTILE_FIELD,\n",
"# },\n",
"# inplace=True,\n",
"# errors=\"raise\",\n",
"# )\n",
"\n",
"# Create the CBG's Census Tract ID by dropping the last number from the FIPS CODE of the CBG.\n", "# Create the CBG's Census Tract ID by dropping the last number from the FIPS CODE of the CBG.\n",
"# The CBG ID is the last one character.\n", "# The CBG ID is the last one character.\n",
"# For more information, see https://www.census.gov/programs-surveys/geography/guidance/geo-identifiers.html.\n", "# For more information, see https://www.census.gov/programs-surveys/geography/guidance/geo-identifiers.html.\n",
@ -124,7 +115,7 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "08962382", "id": "a2448dcd",
"metadata": { "metadata": {
"scrolled": false "scrolled": false
}, },
@ -151,7 +142,7 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "42bd28d4", "id": "f612a86a",
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
@ -169,7 +160,7 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "d77cd872", "id": "4ee6e6ee",
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
@ -197,7 +188,7 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "813e5656", "id": "70d76fbc",
"metadata": { "metadata": {
"scrolled": false "scrolled": false
}, },
@ -227,30 +218,87 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "8a801121", "id": "558a2cc1",
"metadata": { "metadata": {},
"scrolled": true
},
"outputs": [], "outputs": [],
"source": [ "source": [
"cejst_priority_communities_fields = [\n", "# Define a namedtuple for indices.\n",
" \"Score A (top 25th percentile)\",\n", "Index = collections.namedtuple(\n",
" \"Score B (top 25th percentile)\",\n", " typename=\"Index\",\n",
" \"Score C (top 25th percentile)\",\n", " field_names=[\n",
" \"Score D (top 25th percentile)\",\n", " \"method_name\",\n",
" \"Score E (top 25th percentile)\",\n", " \"priority_communities_field\",\n",
" # Note: this field only used by indices defined at the census tract level.\n",
" \"other_census_tract_fields_to_keep\",\n",
" ],\n",
")\n",
"\n",
"# Define the indices used for CEJST scoring (`census_block_group_indices`) as well as comparison\n",
"# (`census_tract_indices`).\n",
"census_block_group_indices = [\n",
" Index(\n",
" method_name=\"Score A\",\n",
" priority_communities_field=\"Score A (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n",
" method_name=\"Score B\",\n",
" priority_communities_field=\"Score B (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n",
" method_name=\"Score C\",\n",
" priority_communities_field=\"Score C (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n",
" method_name=\"Score D (25th percentile)\",\n",
" priority_communities_field=\"Score D (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n",
" method_name=\"Score D (30th percentile)\",\n",
" priority_communities_field=\"Score D (top 30th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n",
" method_name=\"Score D (35th percentile)\",\n",
" priority_communities_field=\"Score D (top 35th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n",
" method_name=\"Score D (40th percentile)\",\n",
" priority_communities_field=\"Score D (top 40th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n",
" method_name=\"Poverty\",\n",
" priority_communities_field=\"Poverty (Less than 200% of federal poverty line) (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
"]\n", "]\n",
"\n", "\n",
"comparison_priority_communities_fields = [\n", "census_tract_indices = [\n",
" \"calenviroscreen_priority_community\",\n", " Index(\n",
" \"hud_recap_priority_community\",\n", " method_name=\"CalEnviroScreen 4.0\",\n",
" priority_communities_field=\"calenviroscreen_priority_community\",\n",
" other_census_tract_fields_to_keep=[\n",
" CALENVIROSCREEN_SCORE_FIELD,\n",
" CALENVIROSCREEN_PERCENTILE_FIELD,\n",
" ],\n",
" ),\n",
" Index(\n",
" method_name=\"HUD RECAP\",\n",
" priority_communities_field=\"hud_recap_priority_community\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
"]" "]"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "9fef0da9", "id": "5b71b2ab",
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
@ -271,17 +319,41 @@
" df[priority_communities_field] * df[CENSUS_BLOCK_GROUP_POPULATION_FIELD]\n", " df[priority_communities_field] * df[CENSUS_BLOCK_GROUP_POPULATION_FIELD]\n",
" )\n", " )\n",
"\n", "\n",
" def calculate_state_comparison(frame: pd.DataFrame) -> pd.DataFrame:\n", " def calculate_state_comparison(\n",
" frame: pd.DataFrame, geography_field: str\n",
" ) -> pd.DataFrame:\n",
" \"\"\"\n", " \"\"\"\n",
" This method will be applied to a `group_by` object. Inherits some parameters from outer scope.\n", " This method will be applied to a `group_by` object. Inherits some parameters from outer scope.\n",
" \"\"\"\n",
" state_id = frame[GEOID_STATE_FIELD_NAME].unique()[0]\n",
"\n", "\n",
" \"\"\"\n",
" summary_dict = {}\n", " summary_dict = {}\n",
" summary_dict[COUNTRY_FIELD_NAME] = frame[COUNTRY_FIELD_NAME].unique()[0]\n",
"\n",
" if geography_field == COUNTRY_FIELD_NAME:\n",
" summary_dict[GEOID_STATE_FIELD_NAME] = \"00\"\n",
" summary_dict[\"Geography name\"] = \"(Entire USA)\"\n",
"\n",
" if geography_field == GEOID_STATE_FIELD_NAME:\n",
" state_id = frame[GEOID_STATE_FIELD_NAME].unique()[0]\n",
" summary_dict[GEOID_STATE_FIELD_NAME] = state_id\n", " summary_dict[GEOID_STATE_FIELD_NAME] = state_id\n",
" summary_dict[\"State name\"] = us.states.lookup(state_id).name\n", " summary_dict[\"Geography name\"] = us.states.lookup(state_id).name\n",
" summary_dict[\"Total CBGs in state\"] = len(frame)\n", "\n",
" summary_dict[\"Total population in state\"] = frame[\n", " # Also add region information\n",
" region_id = frame[\"region\"].unique()[0]\n",
" summary_dict[\"region\"] = region_id\n",
"\n",
" if geography_field == \"region\":\n",
" region_id = frame[\"region\"].unique()[0]\n",
" summary_dict[\"region\"] = region_id\n",
" summary_dict[\"Geography name\"] = region_id\n",
"\n",
" if geography_field == \"division\":\n",
" division_id = frame[\"division\"].unique()[0]\n",
" summary_dict[\"division\"] = division_id\n",
" summary_dict[\"Geography name\"] = division_id\n",
"\n",
" summary_dict[\"Total CBGs in geography\"] = len(frame)\n",
" summary_dict[\"Total population in geography\"] = frame[\n",
" CENSUS_BLOCK_GROUP_POPULATION_FIELD\n", " CENSUS_BLOCK_GROUP_POPULATION_FIELD\n",
" ].sum()\n", " ].sum()\n",
"\n", "\n",
@ -297,24 +369,68 @@
" # Calculate some combinations of other variables.\n", " # Calculate some combinations of other variables.\n",
" summary_dict[f\"{priority_communities_field} (percent CBGs)\"] = (\n", " summary_dict[f\"{priority_communities_field} (percent CBGs)\"] = (\n",
" summary_dict[f\"{priority_communities_field} (total CBGs)\"]\n", " summary_dict[f\"{priority_communities_field} (total CBGs)\"]\n",
" / summary_dict[\"Total CBGs in state\"]\n", " / summary_dict[\"Total CBGs in geography\"]\n",
" )\n", " )\n",
"\n", "\n",
" summary_dict[f\"{priority_communities_field} (percent population)\"] = (\n", " summary_dict[f\"{priority_communities_field} (percent population)\"] = (\n",
" summary_dict[f\"{priority_communities_field}{POPULATION_SUFFIX}\"]\n", " summary_dict[f\"{priority_communities_field}{POPULATION_SUFFIX}\"]\n",
" / summary_dict[\"Total population in state\"]\n", " / summary_dict[\"Total population in geography\"]\n",
" )\n", " )\n",
"\n", "\n",
" df = pd.DataFrame(summary_dict, index=[0])\n", " df = pd.DataFrame(summary_dict, index=[0])\n",
"\n", "\n",
" return df\n", " return df\n",
"\n", "\n",
" grouped_df = df.groupby(GEOID_STATE_FIELD_NAME)\n", " # Add a field for country so we can do aggregations across the entire country.\n",
" df[COUNTRY_FIELD_NAME] = \"USA\"\n",
"\n",
" # First, run the comparison by the whole country\n",
" usa_grouped_df = df.groupby(COUNTRY_FIELD_NAME)\n",
"\n", "\n",
" # Run the comparison function on the groups.\n", " # Run the comparison function on the groups.\n",
" state_distribution_df = grouped_df.progress_apply(calculate_state_comparison)\n", " usa_distribution_df = usa_grouped_df.progress_apply(\n",
" lambda frame: calculate_state_comparison(\n",
" frame, geography_field=COUNTRY_FIELD_NAME\n",
" )\n",
" )\n",
"\n", "\n",
" return state_distribution_df\n", " # Next, run the comparison by state\n",
" state_grouped_df = df.groupby(GEOID_STATE_FIELD_NAME)\n",
"\n",
" # Run the comparison function on the groups.\n",
" state_distribution_df = state_grouped_df.progress_apply(\n",
" lambda frame: calculate_state_comparison(\n",
" frame, geography_field=GEOID_STATE_FIELD_NAME\n",
" )\n",
" )\n",
"\n",
" # Next, run the comparison by region\n",
" region_grouped_df = df.groupby(\"region\")\n",
"\n",
" # Run the comparison function on the groups.\n",
" region_distribution_df = region_grouped_df.progress_apply(\n",
" lambda frame: calculate_state_comparison(frame, geography_field=\"region\")\n",
" )\n",
"\n",
" # Next, run the comparison by division\n",
" division_grouped_df = df.groupby(\"division\")\n",
"\n",
" # Run the comparison function on the groups.\n",
" division_distribution_df = division_grouped_df.progress_apply(\n",
" lambda frame: calculate_state_comparison(frame, geography_field=\"division\")\n",
" )\n",
"\n",
" # Combine the three\n",
" combined_df = pd.concat(\n",
" [\n",
" usa_distribution_df,\n",
" state_distribution_df,\n",
" region_distribution_df,\n",
" division_distribution_df,\n",
" ]\n",
" )\n",
"\n",
" return combined_df\n",
"\n", "\n",
"\n", "\n",
"def write_state_distribution_excel(\n", "def write_state_distribution_excel(\n",
@ -335,12 +451,31 @@
" 0, 0, state_distribution_df.shape[0], state_distribution_df.shape[1]\n", " 0, 0, state_distribution_df.shape[0], state_distribution_df.shape[1]\n",
" )\n", " )\n",
"\n", "\n",
" # Set a width parameter for all columns\n",
" # Note: this is parameterized because every call to `set_column` requires setting the width.\n",
" column_width = 15\n",
"\n",
" for column in state_distribution_df.columns:\n", " for column in state_distribution_df.columns:\n",
" # Special formatting for columns that capture the percent of population considered priority.\n",
" if \"(percent population)\" in column:\n",
" # Turn the column index into excel ranges (e.g., column #95 is \"CR\" and the range may be \"CR2:CR53\").\n", " # Turn the column index into excel ranges (e.g., column #95 is \"CR\" and the range may be \"CR2:CR53\").\n",
" column_index = state_distribution_df.columns.get_loc(column)\n", " column_index = state_distribution_df.columns.get_loc(column)\n",
" column_character = get_excel_column_name(column_index)\n", " column_character = get_excel_column_name(column_index)\n",
"\n",
" # Set all columns to larger width\n",
" worksheet.set_column(f\"{column_character}:{column_character}\", column_width)\n",
"\n",
" # Special formatting for all percent columns\n",
" # Note: we can't just search for `percent`, because that's included in the word `percentile`.\n",
" if \"percent \" in column or \"(percent)\" in column:\n",
" # Make these columns percentages.\n",
" percentage_format = workbook.add_format({\"num_format\": \"0%\"})\n",
" worksheet.set_column(\n",
" f\"{column_character}:{column_character}\",\n",
" column_width,\n",
" percentage_format,\n",
" )\n",
"\n",
" # Special formatting for columns that capture the percent of population considered priority.\n",
" if \"(percent population)\" in column:\n",
" column_ranges = (\n", " column_ranges = (\n",
" f\"{column_character}2:{column_character}{len(state_distribution_df)+1}\"\n", " f\"{column_character}2:{column_character}{len(state_distribution_df)+1}\"\n",
" )\n", " )\n",
@ -356,21 +491,33 @@
" },\n", " },\n",
" )\n", " )\n",
"\n", "\n",
" # TODO: text wrapping not working, fix.\n", " header_format = workbook.add_format(\n",
" text_wrap = workbook.add_format({\"text_wrap\": True})\n", " {\"bold\": True, \"text_wrap\": True, \"valign\": \"bottom\"}\n",
"\n",
" # Make these columns wide enough that you can read them.\n",
" worksheet.set_column(\n",
" f\"{column_character}:{column_character}\", 40, text_wrap\n",
" )\n", " )\n",
"\n", "\n",
" # Overwrite both the value and the format of each header cell\n",
" # This is because xlsxwriter / pandas has a known bug where it can't wrap text for a dataframe.\n",
" # See https://stackoverflow.com/questions/42562977/xlsxwriter-text-wrap-not-working.\n",
" for col_num, value in enumerate(state_distribution_df.columns.values):\n",
" worksheet.write(0, col_num, value, header_format)\n",
"\n",
" writer.save()\n", " writer.save()\n",
"\n", "\n",
"\n", "\n",
"fields_to_analyze = [\n",
" index.priority_communities_field\n",
" for index in census_block_group_indices + census_tract_indices\n",
"]\n",
"\n",
"state_fips_codes = get_state_information(DATA_DIR)\n",
"\n",
"merged_with_state_information_df = merged_df.merge(\n",
" right=state_fips_codes, left_on=GEOID_STATE_FIELD_NAME, right_on=\"fips\"\n",
")\n",
"\n",
"state_distribution_df = get_state_distributions(\n", "state_distribution_df = get_state_distributions(\n",
" df=merged_df,\n", " df=merged_with_state_information_df,\n",
" priority_communities_fields=cejst_priority_communities_fields\n", " priority_communities_fields=fields_to_analyze,\n",
" + comparison_priority_communities_fields,\n",
")\n", ")\n",
"\n", "\n",
"state_distribution_df.to_csv(\n", "state_distribution_df.to_csv(\n",
@ -390,11 +537,159 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "d46667cf", "id": "f9b9a329",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"# This cell defines a couple of comparison functions. It does not run them.\n", "def write_markdown_and_docx_content(\n",
" markdown_content: str, file_dir: pathlib.PosixPath, file_name_without_extension: str\n",
") -> pathlib.PosixPath:\n",
" \"\"\"Write Markdown content to both .md and .docx files.\"\"\"\n",
" # Set the file paths for both files.\n",
" markdown_file_path = file_dir / f\"{file_name_without_extension}.md\"\n",
" docx_file_path = file_dir / f\"{file_name_without_extension}.docx\"\n",
"\n",
" # Write the markdown content to file.\n",
" with open(markdown_file_path, \"w\") as text_file:\n",
" text_file.write(markdown_content)\n",
"\n",
" # Convert markdown file to Word doc.\n",
" pypandoc.convert_file(\n",
" source_file=str(markdown_file_path),\n",
" to=\"docx\",\n",
" outputfile=str(docx_file_path),\n",
" extra_args=[],\n",
" )\n",
"\n",
" return docx_file_path\n",
"\n",
"\n",
"def get_markdown_comparing_census_block_group_indices(\n",
" census_block_group_indices=typing.List[Index],\n",
" df=pd.DataFrame,\n",
" state_field=GEOID_STATE_FIELD_NAME,\n",
") -> str:\n",
" \"\"\"Generate a Markdown string of analysis of multiple CBG indices.\"\"\"\n",
" count_field_name = \"Count of CBGs\"\n",
"\n",
" # List of all states/territories in their FIPS codes:\n",
" state_ids = sorted(df[state_field].unique())\n",
" state_names = \", \".join([us.states.lookup(state_id).name for state_id in state_ids])\n",
"\n",
" # Create markdown content for comparisons.\n",
" markdown_content = f\"\"\"\n",
"# Comparing multiple indices at the census block group level\n",
" \n",
"(This report was calculated on {datetime.today().strftime('%Y-%m-%d')}.)\n",
"\n",
"This report compares the following indices: {\", \".join([index.method_name for index in census_block_group_indices])}.\n",
"\n",
"This report analyzes the following US states and territories: {state_names}.\n",
"\n",
"\"\"\"\n",
"\n",
" for (index1, index2) in itertools.combinations(census_block_group_indices, 2):\n",
" # Group all data by their different values on Priority Communities Field for Index1 vs Priority Communities Field for Index2.\n",
" count_df = (\n",
" df.groupby(\n",
" [index1.priority_communities_field, index2.priority_communities_field]\n",
" )[GEOID_FIELD_NAME]\n",
" .count()\n",
" .reset_index(name=count_field_name)\n",
" )\n",
"\n",
" total_cbgs = count_df[count_field_name].sum()\n",
"\n",
" # Returns a series\n",
" true_true_cbgs_series = count_df.loc[\n",
" count_df[index1.priority_communities_field]\n",
" & count_df[index2.priority_communities_field],\n",
" count_field_name,\n",
" ]\n",
" true_false_cbgs_series = count_df.loc[\n",
" count_df[index1.priority_communities_field]\n",
" & ~count_df[index2.priority_communities_field],\n",
" count_field_name,\n",
" ]\n",
" false_true_cbgs_series = count_df.loc[\n",
" ~count_df[index1.priority_communities_field]\n",
" & count_df[index2.priority_communities_field],\n",
" count_field_name,\n",
" ]\n",
" false_false_cbgs_series = count_df.loc[\n",
" ~count_df[index1.priority_communities_field]\n",
" & ~count_df[index2.priority_communities_field],\n",
" count_field_name,\n",
" ]\n",
"\n",
" # Convert from series to a scalar value, including accounting for if no data exists for that pairing.\n",
" true_true_cbgs = (\n",
" true_true_cbgs_series.iloc[0] if len(true_true_cbgs_series) > 0 else 0\n",
" )\n",
" true_false_cbgs = (\n",
" true_false_cbgs_series.iloc[0] if len(true_false_cbgs_series) > 0 else 0\n",
" )\n",
" false_true_cbgs = (\n",
" false_true_cbgs_series.iloc[0] if len(false_true_cbgs_series) > 0 else 0\n",
" )\n",
" false_false_cbgs = (\n",
" false_false_cbgs_series.iloc[0] if len(false_false_cbgs_series) > 0 else 0\n",
" )\n",
"\n",
" markdown_content += (\n",
" \"*** \\n\\n\"\n",
" \"There are \"\n",
" f\"{true_true_cbgs} ({true_true_cbgs / total_cbgs:.0%}) \"\n",
" f\"census block groups that are both {index1.method_name} priority communities and {index2.method_name} priority communities.\\n\\n\"\n",
" \"There are \"\n",
" f\"{true_false_cbgs} ({true_false_cbgs / total_cbgs:.0%}) \"\n",
" f\"census block groups that are {index1.method_name} priority communities but not {index2.method_name} priority communities.\\n\\n\"\n",
" \"There are \"\n",
" f\"{false_true_cbgs} ({false_true_cbgs / total_cbgs:.0%}) \"\n",
" f\"census block groups that are not {index1.method_name} priority communities but are {index2.method_name} priority communities.\\n\\n\"\n",
" \"There are \"\n",
" f\"{false_false_cbgs} ({false_false_cbgs / total_cbgs:.0%}) \"\n",
" f\"census block groups that are neither {index1.method_name} priority communities nor {index2.method_name} priority communities.\\n\\n\"\n",
" \"\\n\\n\"\n",
" )\n",
"\n",
" return markdown_content\n",
"\n",
"\n",
"def get_comparison_census_block_group_indices(\n",
" census_block_group_indices=typing.List[Index],\n",
" df=pd.DataFrame,\n",
" state_field=GEOID_STATE_FIELD_NAME,\n",
") -> pathlib.PosixPath:\n",
" markdown_content = get_markdown_comparing_census_block_group_indices(\n",
" census_block_group_indices=census_block_group_indices,\n",
" df=merged_with_state_information_df,\n",
" )\n",
"\n",
" comparison_docx_file_path = write_markdown_and_docx_content(\n",
" markdown_content=markdown_content,\n",
" file_dir=COMPARISON_OUTPUTS_DIR,\n",
" file_name_without_extension=f\"Comparison report - All CBG indices\",\n",
" )\n",
"\n",
" return comparison_docx_file_path\n",
"\n",
"\n",
"# Compare multiple scores at the CBG level\n",
"get_comparison_census_block_group_indices(\n",
" census_block_group_indices=census_block_group_indices,\n",
" df=merged_with_state_information_df,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "25a10027",
"metadata": {},
"outputs": [],
"source": [
"# This cell defines a variety of comparison functions. It does not run them.\n",
"\n", "\n",
"# Define a namedtuple for column names, which need to be shared between multiple parts of this comparison pipeline.\n", "# Define a namedtuple for column names, which need to be shared between multiple parts of this comparison pipeline.\n",
"# Named tuples are useful here because they provide guarantees that for each instance, all properties are defined and\n", "# Named tuples are useful here because they provide guarantees that for each instance, all properties are defined and\n",
@ -418,17 +713,6 @@
" ],\n", " ],\n",
")\n", ")\n",
"\n", "\n",
"# Define a namedtuple for indices.\n",
"Index = collections.namedtuple(\n",
" typename=\"Index\",\n",
" field_names=[\n",
" \"method_name\",\n",
" \"priority_communities_field\",\n",
" # Note: this field only used by indices defined at the census tract level.\n",
" \"other_census_tract_fields_to_keep\",\n",
" ],\n",
")\n",
"\n",
"\n", "\n",
"def get_comparison_field_names(\n", "def get_comparison_field_names(\n",
" method_a_name: str,\n", " method_a_name: str,\n",
@ -573,6 +857,16 @@
" else None\n", " else None\n",
" )\n", " )\n",
"\n", "\n",
" # For all remaining fields, calculate the average\n",
" # TODO: refactor to vectorize to make faster.\n",
" for field in [\n",
" \"Poverty (Less than 200% of federal poverty line)\",\n",
" \"Percent of households in linguistic isolation\",\n",
" \"Percent individuals age 25 or over with less than high school degree\",\n",
" \"Unemployed civilians (percent)\",\n",
" ]:\n",
" df[f\"{field} (average of CBGs)\"] = frame.loc[:, field].mean()\n",
"\n",
" return df\n", " return df\n",
"\n", "\n",
" # Group all data by the census tract.\n", " # Group all data by the census tract.\n",
@ -681,27 +975,35 @@
" return markdown_content\n", " return markdown_content\n",
"\n", "\n",
"\n", "\n",
"def write_markdown_and_docx_content(\n", "def get_secondary_comparison_df(\n",
" markdown_content: str, file_dir: pathlib.PosixPath, file_name_without_extension: str\n", " comparison_df: pd.DataFrame,\n",
") -> pathlib.PosixPath:\n", " comparison_field_names: ComparisonFieldNames,\n",
" \"\"\"Write Markdown content to both .md and .docx files.\"\"\"\n", " method_b_priority_census_tracts_field: str,\n",
" # Set the file paths for both files.\n", ") -> pd.DataFrame:\n",
" markdown_file_path = file_dir / f\"{file_name_without_extension}.md\"\n", " \"\"\"A secondary level of comparison.\n",
" docx_file_path = file_dir / f\"{file_name_without_extension}.docx\"\n",
"\n", "\n",
" # Write the markdown content to file.\n", " The first level of comparison identifies census tracts prioritized by Method A,\n",
" with open(markdown_file_path, \"w\") as text_file:\n", " compared to whether or not they're prioritized by Method B.\n",
" text_file.write(markdown_content)\n",
"\n", "\n",
" # Convert markdown file to Word doc.\n", " This comparison method analyzes characteristics of those census tracts, based on whether or not they are prioritized\n",
" pypandoc.convert_file(\n", " or not by Method A and/or Method B.\n",
" source_file=str(markdown_file_path),\n", "\n",
" to=\"docx\",\n", "\n",
" outputfile=str(docx_file_path),\n", " E.g., it might show that tracts prioritized by A but not B have a higher average income,\n",
" extra_args=[],\n", " or that tracts prioritized by B but not A have a lower percent of unemployed people.\"\"\"\n",
" grouped_df = comparison_df.groupby(\n",
" [\n",
" method_b_priority_census_tracts_field,\n",
" comparison_field_names.method_b_tract_has_at_least_one_method_a_cbg,\n",
" comparison_field_names.method_b_non_priority_tract_has_at_least_one_method_a_cbg,\n",
" ],\n",
" dropna=False,\n",
" )\n", " )\n",
"\n", "\n",
" return docx_file_path\n", " # Run the comparison function on the groups.\n",
" secondary_comparison_df = grouped_df.mean().reset_index()\n",
"\n",
" return secondary_comparison_df\n",
"\n", "\n",
"\n", "\n",
"def execute_comparison(\n", "def execute_comparison(\n",
@ -749,18 +1051,34 @@
" output_dir=output_dir,\n", " output_dir=output_dir,\n",
" )\n", " )\n",
"\n", "\n",
" # Choose output path\n", " # Write comparison to CSV.\n",
" file_path = (\n", " file_path = (\n",
" output_dir / f\"Comparison Output - {method_a_name} and {method_b_name}.csv\"\n", " output_dir / f\"Comparison Output - {method_a_name} and {method_b_name}.csv\"\n",
" )\n", " )\n",
"\n",
" # Write comparison to CSV.\n",
" comparison_df.to_csv(\n", " comparison_df.to_csv(\n",
" path_or_buf=file_path,\n", " path_or_buf=file_path,\n",
" na_rep=\"\",\n", " na_rep=\"\",\n",
" index=False,\n", " index=False,\n",
" )\n", " )\n",
"\n", "\n",
" # Secondary comparison DF\n",
" secondary_comparison_df = get_secondary_comparison_df(\n",
" comparison_df=comparison_df,\n",
" comparison_field_names=comparison_field_names,\n",
" method_b_priority_census_tracts_field=method_b_priority_census_tracts_field,\n",
" )\n",
"\n",
" # Write secondary comparison to CSV.\n",
" file_path = (\n",
" output_dir\n",
" / f\"Secondary Comparison Output - {method_a_name} and {method_b_name}.csv\"\n",
" )\n",
" secondary_comparison_df.to_csv(\n",
" path_or_buf=file_path,\n",
" na_rep=\"\",\n",
" index=False,\n",
" )\n",
"\n",
" markdown_content = get_comparison_markdown_content(\n", " markdown_content = get_comparison_markdown_content(\n",
" original_df=df_with_only_shared_states,\n", " original_df=df_with_only_shared_states,\n",
" comparison_df=comparison_df,\n", " comparison_df=comparison_df,\n",
@ -810,63 +1128,13 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "48d9bf6b", "id": "9b8b6d1e",
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
"outputs": [], "outputs": [],
"source": [ "source": [
"# Actually execute the functions\n", "# Actually execute the functions\n",
"\n",
"# # California only\n",
"# cal_df = merged_df[merged_df[GEOID_TRACT_FIELD_NAME].astype(str).str[0:2] == \"06\"]\n",
"# # cal_df = cal_df[0:1000]\n",
"# print(len(cal_df))\n",
"\n",
"census_block_group_indices = [\n",
" Index(\n",
" method_name=\"Score A\",\n",
" priority_communities_field=\"Score A (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" # Index(\n",
" # method_name=\"Score B\",\n",
" # priority_communities_field=\"Score B (top 25th percentile)\",\n",
" # other_census_tract_fields_to_keep=[],\n",
" # ),\n",
" Index(\n",
" method_name=\"Score C\",\n",
" priority_communities_field=\"Score C (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" Index(\n",
" method_name=\"Score D\",\n",
" priority_communities_field=\"Score D (top 25th percentile)\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
" # Index(\n",
" # method_name=\"Score E\",\n",
" # priority_communities_field=\"Score E (top 25th percentile)\",\n",
" # other_census_tract_fields_to_keep=[],\n",
" # ),\n",
"]\n",
"\n",
"census_tract_indices = [\n",
" Index(\n",
" method_name=\"CalEnviroScreen 4.0\",\n",
" priority_communities_field=\"calenviroscreen_priority_community\",\n",
" other_census_tract_fields_to_keep=[\n",
" CALENVIROSCREEN_SCORE_FIELD,\n",
" CALENVIROSCREEN_PERCENTILE_FIELD,\n",
" ],\n",
" ),\n",
" Index(\n",
" method_name=\"HUD RECAP\",\n",
" priority_communities_field=\"hud_recap_priority_community\",\n",
" other_census_tract_fields_to_keep=[],\n",
" ),\n",
"]\n",
"\n",
"file_paths = execute_comparisons(\n", "file_paths = execute_comparisons(\n",
" df=merged_df,\n", " df=merged_df,\n",
" census_block_group_indices=census_block_group_indices,\n", " census_block_group_indices=census_block_group_indices,\n",

View file

@ -133,7 +133,6 @@ def unzip_file_from_url(
# cleanup temporary file # cleanup temporary file
os.remove(zip_file_path) os.remove(zip_file_path)
def data_folder_cleanup() -> None: def data_folder_cleanup() -> None:
"""Remove all files and directories from the local data/dataset path""" """Remove all files and directories from the local data/dataset path"""
@ -161,7 +160,6 @@ def temp_folder_cleanup() -> None:
logger.info(f"Initializing all temp directoriees") logger.info(f"Initializing all temp directoriees")
remove_all_from_dir(data_path / "tmp") remove_all_from_dir(data_path / "tmp")
def get_excel_column_name(index: int) -> str: def get_excel_column_name(index: int) -> str:
"""Map a numeric index to the appropriate column in Excel. E.g., column #95 is "CR". """Map a numeric index to the appropriate column in Excel. E.g., column #95 is "CR".
Only works for the first 1000 columns. Only works for the first 1000 columns.