From 67b39475f704e46942e062e2f1cfec83ea8807c5 Mon Sep 17 00:00:00 2001 From: Lucas Merrill Brown Date: Mon, 26 Jul 2021 08:02:25 -0700 Subject: [PATCH] Analysis by region (#385) * Adding regional comparisons * Small ETL fixes --- data/data-pipeline/etl/score/etl_score.py | 14 +- .../etl/sources/calenviroscreen/etl.py | 2 +- .../etl/sources/census/etl_utils.py | 16 + .../sources/housing_and_transportation/etl.py | 2 - .../etl/sources/hud_recap/etl.py | 2 +- .../ipython/scoring_comparison.ipynb | 560 +++++++++++++----- data/data-pipeline/utils.py | 2 - 7 files changed, 440 insertions(+), 158 deletions(-) diff --git a/data/data-pipeline/etl/score/etl_score.py b/data/data-pipeline/etl/score/etl_score.py index a4a48d7b..d7e2b36c 100644 --- a/data/data-pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/etl/score/etl_score.py @@ -394,13 +394,15 @@ class ScoreETL(ExtractTransformLoad): "Score C", "Score D", "Score E", + "Poverty (Less than 200% of federal poverty line)", ]: - self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = self.df[ - score_field - ].rank(pct=True) - self.df[f"{score_field} (top 25th percentile)"] = ( - self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] >= 0.75 - ) + self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] = self.df[score_field].rank(pct=True) + + for threshold in [0.25, 0.3, 0.35, 0.4]: + fraction_converted_to_percent = int(100 * threshold) + self.df[f"{score_field} (top {fraction_converted_to_percent}th percentile)"] = ( + self.df[f"{score_field}{self.PERCENTILE_FIELD_SUFFIX}"] >= 1 - threshold + ) def load(self) -> None: logger.info(f"Saving Score CSV") diff --git a/data/data-pipeline/etl/sources/calenviroscreen/etl.py b/data/data-pipeline/etl/sources/calenviroscreen/etl.py index 6349deb6..ad56b26a 100644 --- a/data/data-pipeline/etl/sources/calenviroscreen/etl.py +++ b/data/data-pipeline/etl/sources/calenviroscreen/etl.py @@ -8,7 +8,7 @@ logger = get_module_logger(__name__) class CalEnviroScreenETL(ExtractTransformLoad): def __init__(self): - self.CALENVIROSCREEN_FTP_URL = "https://justice40-data.s3.amazonaws.com/CalEnviroScreen/CalEnviroScreen_4.0_2021.zip" + self.CALENVIROSCREEN_FTP_URL = "https://justice40-data.s3.amazonaws.com/data-sources/CalEnviroScreen_4.0_2021.zip" self.CALENVIROSCREEN_CSV = self.TMP_PATH / "CalEnviroScreen_4.0_2021.csv" self.CSV_PATH = self.DATA_PATH / "dataset" / "calenviroscreen4" diff --git a/data/data-pipeline/etl/sources/census/etl_utils.py b/data/data-pipeline/etl/sources/census/etl_utils.py index dba41945..e11e5f6b 100644 --- a/data/data-pipeline/etl/sources/census/etl_utils.py +++ b/data/data-pipeline/etl/sources/census/etl_utils.py @@ -1,5 +1,6 @@ from pathlib import Path import csv +import pandas as pd import os from config import settings @@ -53,3 +54,18 @@ def get_state_fips_codes(data_path: Path) -> list: fips = row[0].strip() fips_state_list.append(fips) return fips_state_list + + +def get_state_information(data_path: Path) -> pd.DataFrame: + """Load the full state file as a dataframe. + + Useful because of the state regional information. + """ + fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv" + + df = pd.read_csv(fips_csv_path) + + # Left pad the FIPS codes with 0s + df["fips"] = df["fips"].astype(str).apply(lambda x: x.zfill(2)) + + return df diff --git a/data/data-pipeline/etl/sources/housing_and_transportation/etl.py b/data/data-pipeline/etl/sources/housing_and_transportation/etl.py index 348de679..bdbd3b7d 100644 --- a/data/data-pipeline/etl/sources/housing_and_transportation/etl.py +++ b/data/data-pipeline/etl/sources/housing_and_transportation/etl.py @@ -44,8 +44,6 @@ class HousingTransportationETL(ExtractTransformLoad): self.df = pd.concat(dfs) - self.df.head() - def transform(self) -> None: logger.info(f"Transforming Housing and Transportation Data") diff --git a/data/data-pipeline/etl/sources/hud_recap/etl.py b/data/data-pipeline/etl/sources/hud_recap/etl.py index 9fcafc23..be7a6315 100644 --- a/data/data-pipeline/etl/sources/hud_recap/etl.py +++ b/data/data-pipeline/etl/sources/hud_recap/etl.py @@ -33,7 +33,7 @@ class HudRecapETL(ExtractTransformLoad): logger.info(f"Transforming HUD Recap Data") # Load comparison index (CalEnviroScreen 4) - self.df = pd.read_csv(self.HUD_RECAP_CSV, dtype={"Census Tract": "string"}) + self.df = pd.read_csv(self.HUD_RECAP_CSV, dtype={"GEOID": "string"}) self.df.rename( columns={ diff --git a/data/data-pipeline/ipython/scoring_comparison.ipynb b/data/data-pipeline/ipython/scoring_comparison.ipynb index 64733e10..125cbb83 100644 --- a/data/data-pipeline/ipython/scoring_comparison.ipynb +++ b/data/data-pipeline/ipython/scoring_comparison.ipynb @@ -3,7 +3,7 @@ { "cell_type": "code", "execution_count": null, - "id": "54615cef", + "id": "93c7b73b", "metadata": { "scrolled": true }, @@ -17,6 +17,7 @@ "import collections\n", "import functools\n", "import IPython\n", + "import itertools\n", "import numpy as np\n", "import os\n", "import pandas as pd\n", @@ -37,6 +38,8 @@ " sys.path.append(module_path)\n", "\n", "from utils import remove_all_from_dir, get_excel_column_name\n", + "from etl.sources.census.etl_utils import get_state_information\n", + "\n", "\n", "# Turn on TQDM for pandas so that we can have progress bars when running `apply`.\n", "tqdm_notebook.pandas()" @@ -45,7 +48,7 @@ { "cell_type": "code", "execution_count": null, - "id": "49a63129", + "id": "881424fd", "metadata": { "scrolled": true }, @@ -56,8 +59,8 @@ "\n", "# Set some global parameters\n", "DATA_DIR = pathlib.Path.cwd().parent / \"data\"\n", - "TEMP_DATA_DIR = pathlib.Path.cwd().parent / \"data\" / \"tmp\"\n", - "COMPARISON_OUTPUTS_DIR = TEMP_DATA_DIR / \"comparison_outputs\"\n", + "TEMP_DATA_DIR = DATA_DIR / \"tmp\"\n", + "COMPARISON_OUTPUTS_DIR = DATA_DIR / \"comparison_outputs\"\n", "\n", "# Make the dirs if they don't exist\n", "TEMP_DATA_DIR.mkdir(parents=True, exist_ok=True)\n", @@ -71,6 +74,7 @@ "GEOID_FIELD_NAME = \"GEOID10\"\n", "GEOID_TRACT_FIELD_NAME = \"GEOID10_TRACT\"\n", "GEOID_STATE_FIELD_NAME = \"GEOID10_STATE\"\n", + "COUNTRY_FIELD_NAME = \"Country\"\n", "CENSUS_BLOCK_GROUP_POPULATION_FIELD = \"Total population\"\n", "\n", "CEJST_SCORE_FIELD = \"cejst_score\"\n", @@ -84,29 +88,16 @@ { "cell_type": "code", "execution_count": null, - "id": "2b26dccf", + "id": "c5f3eaa5", "metadata": { - "scrolled": true + "scrolled": false }, "outputs": [], "source": [ "# Load CEJST score data\n", - "cejst_data_path = DATA_DIR / \"score\" / \"csv\" / \"usa.csv\"\n", + "cejst_data_path = DATA_DIR / \"score\" / \"csv\" / \"full\" / \"usa.csv\"\n", "cejst_df = pd.read_csv(cejst_data_path, dtype={GEOID_FIELD_NAME: \"string\"})\n", "\n", - "# score_used = \"Score A\"\n", - "\n", - "# # Rename unclear name \"id\" to \"census_block_group_id\", as well as other renamings.\n", - "# cejst_df.rename(\n", - "# columns={\n", - "# \"Total population\": CENSUS_BLOCK_GROUP_POPULATION_FIELD,\n", - "# score_used: CEJST_SCORE_FIELD,\n", - "# f\"{score_used} (percentile)\": CEJST_PERCENTILE_FIELD,\n", - "# },\n", - "# inplace=True,\n", - "# errors=\"raise\",\n", - "# )\n", - "\n", "# Create the CBG's Census Tract ID by dropping the last number from the FIPS CODE of the CBG.\n", "# The CBG ID is the last one character.\n", "# For more information, see https://www.census.gov/programs-surveys/geography/guidance/geo-identifiers.html.\n", @@ -124,7 +115,7 @@ { "cell_type": "code", "execution_count": null, - "id": "08962382", + "id": "a2448dcd", "metadata": { "scrolled": false }, @@ -151,7 +142,7 @@ { "cell_type": "code", "execution_count": null, - "id": "42bd28d4", + "id": "f612a86a", "metadata": { "scrolled": true }, @@ -169,7 +160,7 @@ { "cell_type": "code", "execution_count": null, - "id": "d77cd872", + "id": "4ee6e6ee", "metadata": { "scrolled": true }, @@ -197,7 +188,7 @@ { "cell_type": "code", "execution_count": null, - "id": "813e5656", + "id": "70d76fbc", "metadata": { "scrolled": false }, @@ -227,30 +218,87 @@ { "cell_type": "code", "execution_count": null, - "id": "8a801121", - "metadata": { - "scrolled": true - }, + "id": "558a2cc1", + "metadata": {}, "outputs": [], "source": [ - "cejst_priority_communities_fields = [\n", - " \"Score A (top 25th percentile)\",\n", - " \"Score B (top 25th percentile)\",\n", - " \"Score C (top 25th percentile)\",\n", - " \"Score D (top 25th percentile)\",\n", - " \"Score E (top 25th percentile)\",\n", + "# Define a namedtuple for indices.\n", + "Index = collections.namedtuple(\n", + " typename=\"Index\",\n", + " field_names=[\n", + " \"method_name\",\n", + " \"priority_communities_field\",\n", + " # Note: this field only used by indices defined at the census tract level.\n", + " \"other_census_tract_fields_to_keep\",\n", + " ],\n", + ")\n", + "\n", + "# Define the indices used for CEJST scoring (`census_block_group_indices`) as well as comparison\n", + "# (`census_tract_indices`).\n", + "census_block_group_indices = [\n", + " Index(\n", + " method_name=\"Score A\",\n", + " priority_communities_field=\"Score A (top 25th percentile)\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", + " Index(\n", + " method_name=\"Score B\",\n", + " priority_communities_field=\"Score B (top 25th percentile)\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", + " Index(\n", + " method_name=\"Score C\",\n", + " priority_communities_field=\"Score C (top 25th percentile)\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", + " Index(\n", + " method_name=\"Score D (25th percentile)\",\n", + " priority_communities_field=\"Score D (top 25th percentile)\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", + " Index(\n", + " method_name=\"Score D (30th percentile)\",\n", + " priority_communities_field=\"Score D (top 30th percentile)\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", + " Index(\n", + " method_name=\"Score D (35th percentile)\",\n", + " priority_communities_field=\"Score D (top 35th percentile)\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", + " Index(\n", + " method_name=\"Score D (40th percentile)\",\n", + " priority_communities_field=\"Score D (top 40th percentile)\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", + " Index(\n", + " method_name=\"Poverty\",\n", + " priority_communities_field=\"Poverty (Less than 200% of federal poverty line) (top 25th percentile)\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", "]\n", "\n", - "comparison_priority_communities_fields = [\n", - " \"calenviroscreen_priority_community\",\n", - " \"hud_recap_priority_community\",\n", + "census_tract_indices = [\n", + " Index(\n", + " method_name=\"CalEnviroScreen 4.0\",\n", + " priority_communities_field=\"calenviroscreen_priority_community\",\n", + " other_census_tract_fields_to_keep=[\n", + " CALENVIROSCREEN_SCORE_FIELD,\n", + " CALENVIROSCREEN_PERCENTILE_FIELD,\n", + " ],\n", + " ),\n", + " Index(\n", + " method_name=\"HUD RECAP\",\n", + " priority_communities_field=\"hud_recap_priority_community\",\n", + " other_census_tract_fields_to_keep=[],\n", + " ),\n", "]" ] }, { "cell_type": "code", "execution_count": null, - "id": "9fef0da9", + "id": "5b71b2ab", "metadata": { "scrolled": true }, @@ -271,17 +319,41 @@ " df[priority_communities_field] * df[CENSUS_BLOCK_GROUP_POPULATION_FIELD]\n", " )\n", "\n", - " def calculate_state_comparison(frame: pd.DataFrame) -> pd.DataFrame:\n", + " def calculate_state_comparison(\n", + " frame: pd.DataFrame, geography_field: str\n", + " ) -> pd.DataFrame:\n", " \"\"\"\n", " This method will be applied to a `group_by` object. Inherits some parameters from outer scope.\n", - " \"\"\"\n", - " state_id = frame[GEOID_STATE_FIELD_NAME].unique()[0]\n", "\n", + " \"\"\"\n", " summary_dict = {}\n", - " summary_dict[GEOID_STATE_FIELD_NAME] = state_id\n", - " summary_dict[\"State name\"] = us.states.lookup(state_id).name\n", - " summary_dict[\"Total CBGs in state\"] = len(frame)\n", - " summary_dict[\"Total population in state\"] = frame[\n", + " summary_dict[COUNTRY_FIELD_NAME] = frame[COUNTRY_FIELD_NAME].unique()[0]\n", + "\n", + " if geography_field == COUNTRY_FIELD_NAME:\n", + " summary_dict[GEOID_STATE_FIELD_NAME] = \"00\"\n", + " summary_dict[\"Geography name\"] = \"(Entire USA)\"\n", + "\n", + " if geography_field == GEOID_STATE_FIELD_NAME:\n", + " state_id = frame[GEOID_STATE_FIELD_NAME].unique()[0]\n", + " summary_dict[GEOID_STATE_FIELD_NAME] = state_id\n", + " summary_dict[\"Geography name\"] = us.states.lookup(state_id).name\n", + "\n", + " # Also add region information\n", + " region_id = frame[\"region\"].unique()[0]\n", + " summary_dict[\"region\"] = region_id\n", + "\n", + " if geography_field == \"region\":\n", + " region_id = frame[\"region\"].unique()[0]\n", + " summary_dict[\"region\"] = region_id\n", + " summary_dict[\"Geography name\"] = region_id\n", + "\n", + " if geography_field == \"division\":\n", + " division_id = frame[\"division\"].unique()[0]\n", + " summary_dict[\"division\"] = division_id\n", + " summary_dict[\"Geography name\"] = division_id\n", + "\n", + " summary_dict[\"Total CBGs in geography\"] = len(frame)\n", + " summary_dict[\"Total population in geography\"] = frame[\n", " CENSUS_BLOCK_GROUP_POPULATION_FIELD\n", " ].sum()\n", "\n", @@ -297,24 +369,68 @@ " # Calculate some combinations of other variables.\n", " summary_dict[f\"{priority_communities_field} (percent CBGs)\"] = (\n", " summary_dict[f\"{priority_communities_field} (total CBGs)\"]\n", - " / summary_dict[\"Total CBGs in state\"]\n", + " / summary_dict[\"Total CBGs in geography\"]\n", " )\n", "\n", " summary_dict[f\"{priority_communities_field} (percent population)\"] = (\n", " summary_dict[f\"{priority_communities_field}{POPULATION_SUFFIX}\"]\n", - " / summary_dict[\"Total population in state\"]\n", + " / summary_dict[\"Total population in geography\"]\n", " )\n", "\n", " df = pd.DataFrame(summary_dict, index=[0])\n", "\n", " return df\n", "\n", - " grouped_df = df.groupby(GEOID_STATE_FIELD_NAME)\n", + " # Add a field for country so we can do aggregations across the entire country.\n", + " df[COUNTRY_FIELD_NAME] = \"USA\"\n", + "\n", + " # First, run the comparison by the whole country\n", + " usa_grouped_df = df.groupby(COUNTRY_FIELD_NAME)\n", "\n", " # Run the comparison function on the groups.\n", - " state_distribution_df = grouped_df.progress_apply(calculate_state_comparison)\n", + " usa_distribution_df = usa_grouped_df.progress_apply(\n", + " lambda frame: calculate_state_comparison(\n", + " frame, geography_field=COUNTRY_FIELD_NAME\n", + " )\n", + " )\n", "\n", - " return state_distribution_df\n", + " # Next, run the comparison by state\n", + " state_grouped_df = df.groupby(GEOID_STATE_FIELD_NAME)\n", + "\n", + " # Run the comparison function on the groups.\n", + " state_distribution_df = state_grouped_df.progress_apply(\n", + " lambda frame: calculate_state_comparison(\n", + " frame, geography_field=GEOID_STATE_FIELD_NAME\n", + " )\n", + " )\n", + "\n", + " # Next, run the comparison by region\n", + " region_grouped_df = df.groupby(\"region\")\n", + "\n", + " # Run the comparison function on the groups.\n", + " region_distribution_df = region_grouped_df.progress_apply(\n", + " lambda frame: calculate_state_comparison(frame, geography_field=\"region\")\n", + " )\n", + "\n", + " # Next, run the comparison by division\n", + " division_grouped_df = df.groupby(\"division\")\n", + "\n", + " # Run the comparison function on the groups.\n", + " division_distribution_df = division_grouped_df.progress_apply(\n", + " lambda frame: calculate_state_comparison(frame, geography_field=\"division\")\n", + " )\n", + "\n", + " # Combine the three\n", + " combined_df = pd.concat(\n", + " [\n", + " usa_distribution_df,\n", + " state_distribution_df,\n", + " region_distribution_df,\n", + " division_distribution_df,\n", + " ]\n", + " )\n", + "\n", + " return combined_df\n", "\n", "\n", "def write_state_distribution_excel(\n", @@ -335,12 +451,31 @@ " 0, 0, state_distribution_df.shape[0], state_distribution_df.shape[1]\n", " )\n", "\n", + " # Set a width parameter for all columns\n", + " # Note: this is parameterized because every call to `set_column` requires setting the width.\n", + " column_width = 15\n", + "\n", " for column in state_distribution_df.columns:\n", + " # Turn the column index into excel ranges (e.g., column #95 is \"CR\" and the range may be \"CR2:CR53\").\n", + " column_index = state_distribution_df.columns.get_loc(column)\n", + " column_character = get_excel_column_name(column_index)\n", + "\n", + " # Set all columns to larger width\n", + " worksheet.set_column(f\"{column_character}:{column_character}\", column_width)\n", + "\n", + " # Special formatting for all percent columns\n", + " # Note: we can't just search for `percent`, because that's included in the word `percentile`.\n", + " if \"percent \" in column or \"(percent)\" in column:\n", + " # Make these columns percentages.\n", + " percentage_format = workbook.add_format({\"num_format\": \"0%\"})\n", + " worksheet.set_column(\n", + " f\"{column_character}:{column_character}\",\n", + " column_width,\n", + " percentage_format,\n", + " )\n", + "\n", " # Special formatting for columns that capture the percent of population considered priority.\n", " if \"(percent population)\" in column:\n", - " # Turn the column index into excel ranges (e.g., column #95 is \"CR\" and the range may be \"CR2:CR53\").\n", - " column_index = state_distribution_df.columns.get_loc(column)\n", - " column_character = get_excel_column_name(column_index)\n", " column_ranges = (\n", " f\"{column_character}2:{column_character}{len(state_distribution_df)+1}\"\n", " )\n", @@ -356,21 +491,33 @@ " },\n", " )\n", "\n", - " # TODO: text wrapping not working, fix.\n", - " text_wrap = workbook.add_format({\"text_wrap\": True})\n", + " header_format = workbook.add_format(\n", + " {\"bold\": True, \"text_wrap\": True, \"valign\": \"bottom\"}\n", + " )\n", "\n", - " # Make these columns wide enough that you can read them.\n", - " worksheet.set_column(\n", - " f\"{column_character}:{column_character}\", 40, text_wrap\n", - " )\n", + " # Overwrite both the value and the format of each header cell\n", + " # This is because xlsxwriter / pandas has a known bug where it can't wrap text for a dataframe.\n", + " # See https://stackoverflow.com/questions/42562977/xlsxwriter-text-wrap-not-working.\n", + " for col_num, value in enumerate(state_distribution_df.columns.values):\n", + " worksheet.write(0, col_num, value, header_format)\n", "\n", " writer.save()\n", "\n", "\n", + "fields_to_analyze = [\n", + " index.priority_communities_field\n", + " for index in census_block_group_indices + census_tract_indices\n", + "]\n", + "\n", + "state_fips_codes = get_state_information(DATA_DIR)\n", + "\n", + "merged_with_state_information_df = merged_df.merge(\n", + " right=state_fips_codes, left_on=GEOID_STATE_FIELD_NAME, right_on=\"fips\"\n", + ")\n", + "\n", "state_distribution_df = get_state_distributions(\n", - " df=merged_df,\n", - " priority_communities_fields=cejst_priority_communities_fields\n", - " + comparison_priority_communities_fields,\n", + " df=merged_with_state_information_df,\n", + " priority_communities_fields=fields_to_analyze,\n", ")\n", "\n", "state_distribution_df.to_csv(\n", @@ -390,11 +537,159 @@ { "cell_type": "code", "execution_count": null, - "id": "d46667cf", + "id": "f9b9a329", "metadata": {}, "outputs": [], "source": [ - "# This cell defines a couple of comparison functions. It does not run them.\n", + "def write_markdown_and_docx_content(\n", + " markdown_content: str, file_dir: pathlib.PosixPath, file_name_without_extension: str\n", + ") -> pathlib.PosixPath:\n", + " \"\"\"Write Markdown content to both .md and .docx files.\"\"\"\n", + " # Set the file paths for both files.\n", + " markdown_file_path = file_dir / f\"{file_name_without_extension}.md\"\n", + " docx_file_path = file_dir / f\"{file_name_without_extension}.docx\"\n", + "\n", + " # Write the markdown content to file.\n", + " with open(markdown_file_path, \"w\") as text_file:\n", + " text_file.write(markdown_content)\n", + "\n", + " # Convert markdown file to Word doc.\n", + " pypandoc.convert_file(\n", + " source_file=str(markdown_file_path),\n", + " to=\"docx\",\n", + " outputfile=str(docx_file_path),\n", + " extra_args=[],\n", + " )\n", + "\n", + " return docx_file_path\n", + "\n", + "\n", + "def get_markdown_comparing_census_block_group_indices(\n", + " census_block_group_indices=typing.List[Index],\n", + " df=pd.DataFrame,\n", + " state_field=GEOID_STATE_FIELD_NAME,\n", + ") -> str:\n", + " \"\"\"Generate a Markdown string of analysis of multiple CBG indices.\"\"\"\n", + " count_field_name = \"Count of CBGs\"\n", + "\n", + " # List of all states/territories in their FIPS codes:\n", + " state_ids = sorted(df[state_field].unique())\n", + " state_names = \", \".join([us.states.lookup(state_id).name for state_id in state_ids])\n", + "\n", + " # Create markdown content for comparisons.\n", + " markdown_content = f\"\"\"\n", + "# Comparing multiple indices at the census block group level\n", + " \n", + "(This report was calculated on {datetime.today().strftime('%Y-%m-%d')}.)\n", + "\n", + "This report compares the following indices: {\", \".join([index.method_name for index in census_block_group_indices])}.\n", + "\n", + "This report analyzes the following US states and territories: {state_names}.\n", + "\n", + "\"\"\"\n", + "\n", + " for (index1, index2) in itertools.combinations(census_block_group_indices, 2):\n", + " # Group all data by their different values on Priority Communities Field for Index1 vs Priority Communities Field for Index2.\n", + " count_df = (\n", + " df.groupby(\n", + " [index1.priority_communities_field, index2.priority_communities_field]\n", + " )[GEOID_FIELD_NAME]\n", + " .count()\n", + " .reset_index(name=count_field_name)\n", + " )\n", + "\n", + " total_cbgs = count_df[count_field_name].sum()\n", + "\n", + " # Returns a series\n", + " true_true_cbgs_series = count_df.loc[\n", + " count_df[index1.priority_communities_field]\n", + " & count_df[index2.priority_communities_field],\n", + " count_field_name,\n", + " ]\n", + " true_false_cbgs_series = count_df.loc[\n", + " count_df[index1.priority_communities_field]\n", + " & ~count_df[index2.priority_communities_field],\n", + " count_field_name,\n", + " ]\n", + " false_true_cbgs_series = count_df.loc[\n", + " ~count_df[index1.priority_communities_field]\n", + " & count_df[index2.priority_communities_field],\n", + " count_field_name,\n", + " ]\n", + " false_false_cbgs_series = count_df.loc[\n", + " ~count_df[index1.priority_communities_field]\n", + " & ~count_df[index2.priority_communities_field],\n", + " count_field_name,\n", + " ]\n", + "\n", + " # Convert from series to a scalar value, including accounting for if no data exists for that pairing.\n", + " true_true_cbgs = (\n", + " true_true_cbgs_series.iloc[0] if len(true_true_cbgs_series) > 0 else 0\n", + " )\n", + " true_false_cbgs = (\n", + " true_false_cbgs_series.iloc[0] if len(true_false_cbgs_series) > 0 else 0\n", + " )\n", + " false_true_cbgs = (\n", + " false_true_cbgs_series.iloc[0] if len(false_true_cbgs_series) > 0 else 0\n", + " )\n", + " false_false_cbgs = (\n", + " false_false_cbgs_series.iloc[0] if len(false_false_cbgs_series) > 0 else 0\n", + " )\n", + "\n", + " markdown_content += (\n", + " \"*** \\n\\n\"\n", + " \"There are \"\n", + " f\"{true_true_cbgs} ({true_true_cbgs / total_cbgs:.0%}) \"\n", + " f\"census block groups that are both {index1.method_name} priority communities and {index2.method_name} priority communities.\\n\\n\"\n", + " \"There are \"\n", + " f\"{true_false_cbgs} ({true_false_cbgs / total_cbgs:.0%}) \"\n", + " f\"census block groups that are {index1.method_name} priority communities but not {index2.method_name} priority communities.\\n\\n\"\n", + " \"There are \"\n", + " f\"{false_true_cbgs} ({false_true_cbgs / total_cbgs:.0%}) \"\n", + " f\"census block groups that are not {index1.method_name} priority communities but are {index2.method_name} priority communities.\\n\\n\"\n", + " \"There are \"\n", + " f\"{false_false_cbgs} ({false_false_cbgs / total_cbgs:.0%}) \"\n", + " f\"census block groups that are neither {index1.method_name} priority communities nor {index2.method_name} priority communities.\\n\\n\"\n", + " \"\\n\\n\"\n", + " )\n", + "\n", + " return markdown_content\n", + "\n", + "\n", + "def get_comparison_census_block_group_indices(\n", + " census_block_group_indices=typing.List[Index],\n", + " df=pd.DataFrame,\n", + " state_field=GEOID_STATE_FIELD_NAME,\n", + ") -> pathlib.PosixPath:\n", + " markdown_content = get_markdown_comparing_census_block_group_indices(\n", + " census_block_group_indices=census_block_group_indices,\n", + " df=merged_with_state_information_df,\n", + " )\n", + "\n", + " comparison_docx_file_path = write_markdown_and_docx_content(\n", + " markdown_content=markdown_content,\n", + " file_dir=COMPARISON_OUTPUTS_DIR,\n", + " file_name_without_extension=f\"Comparison report - All CBG indices\",\n", + " )\n", + "\n", + " return comparison_docx_file_path\n", + "\n", + "\n", + "# Compare multiple scores at the CBG level\n", + "get_comparison_census_block_group_indices(\n", + " census_block_group_indices=census_block_group_indices,\n", + " df=merged_with_state_information_df,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25a10027", + "metadata": {}, + "outputs": [], + "source": [ + "# This cell defines a variety of comparison functions. It does not run them.\n", "\n", "# Define a namedtuple for column names, which need to be shared between multiple parts of this comparison pipeline.\n", "# Named tuples are useful here because they provide guarantees that for each instance, all properties are defined and\n", @@ -418,17 +713,6 @@ " ],\n", ")\n", "\n", - "# Define a namedtuple for indices.\n", - "Index = collections.namedtuple(\n", - " typename=\"Index\",\n", - " field_names=[\n", - " \"method_name\",\n", - " \"priority_communities_field\",\n", - " # Note: this field only used by indices defined at the census tract level.\n", - " \"other_census_tract_fields_to_keep\",\n", - " ],\n", - ")\n", - "\n", "\n", "def get_comparison_field_names(\n", " method_a_name: str,\n", @@ -573,6 +857,16 @@ " else None\n", " )\n", "\n", + " # For all remaining fields, calculate the average\n", + " # TODO: refactor to vectorize to make faster.\n", + " for field in [\n", + " \"Poverty (Less than 200% of federal poverty line)\",\n", + " \"Percent of households in linguistic isolation\",\n", + " \"Percent individuals age 25 or over with less than high school degree\",\n", + " \"Unemployed civilians (percent)\",\n", + " ]:\n", + " df[f\"{field} (average of CBGs)\"] = frame.loc[:, field].mean()\n", + "\n", " return df\n", "\n", " # Group all data by the census tract.\n", @@ -681,27 +975,35 @@ " return markdown_content\n", "\n", "\n", - "def write_markdown_and_docx_content(\n", - " markdown_content: str, file_dir: pathlib.PosixPath, file_name_without_extension: str\n", - ") -> pathlib.PosixPath:\n", - " \"\"\"Write Markdown content to both .md and .docx files.\"\"\"\n", - " # Set the file paths for both files.\n", - " markdown_file_path = file_dir / f\"{file_name_without_extension}.md\"\n", - " docx_file_path = file_dir / f\"{file_name_without_extension}.docx\"\n", + "def get_secondary_comparison_df(\n", + " comparison_df: pd.DataFrame,\n", + " comparison_field_names: ComparisonFieldNames,\n", + " method_b_priority_census_tracts_field: str,\n", + ") -> pd.DataFrame:\n", + " \"\"\"A secondary level of comparison.\n", "\n", - " # Write the markdown content to file.\n", - " with open(markdown_file_path, \"w\") as text_file:\n", - " text_file.write(markdown_content)\n", + " The first level of comparison identifies census tracts prioritized by Method A,\n", + " compared to whether or not they're prioritized by Method B.\n", "\n", - " # Convert markdown file to Word doc.\n", - " pypandoc.convert_file(\n", - " source_file=str(markdown_file_path),\n", - " to=\"docx\",\n", - " outputfile=str(docx_file_path),\n", - " extra_args=[],\n", + " This comparison method analyzes characteristics of those census tracts, based on whether or not they are prioritized\n", + " or not by Method A and/or Method B.\n", + "\n", + "\n", + " E.g., it might show that tracts prioritized by A but not B have a higher average income,\n", + " or that tracts prioritized by B but not A have a lower percent of unemployed people.\"\"\"\n", + " grouped_df = comparison_df.groupby(\n", + " [\n", + " method_b_priority_census_tracts_field,\n", + " comparison_field_names.method_b_tract_has_at_least_one_method_a_cbg,\n", + " comparison_field_names.method_b_non_priority_tract_has_at_least_one_method_a_cbg,\n", + " ],\n", + " dropna=False,\n", " )\n", "\n", - " return docx_file_path\n", + " # Run the comparison function on the groups.\n", + " secondary_comparison_df = grouped_df.mean().reset_index()\n", + "\n", + " return secondary_comparison_df\n", "\n", "\n", "def execute_comparison(\n", @@ -749,18 +1051,34 @@ " output_dir=output_dir,\n", " )\n", "\n", - " # Choose output path\n", + " # Write comparison to CSV.\n", " file_path = (\n", " output_dir / f\"Comparison Output - {method_a_name} and {method_b_name}.csv\"\n", " )\n", - "\n", - " # Write comparison to CSV.\n", " comparison_df.to_csv(\n", " path_or_buf=file_path,\n", " na_rep=\"\",\n", " index=False,\n", " )\n", "\n", + " # Secondary comparison DF\n", + " secondary_comparison_df = get_secondary_comparison_df(\n", + " comparison_df=comparison_df,\n", + " comparison_field_names=comparison_field_names,\n", + " method_b_priority_census_tracts_field=method_b_priority_census_tracts_field,\n", + " )\n", + "\n", + " # Write secondary comparison to CSV.\n", + " file_path = (\n", + " output_dir\n", + " / f\"Secondary Comparison Output - {method_a_name} and {method_b_name}.csv\"\n", + " )\n", + " secondary_comparison_df.to_csv(\n", + " path_or_buf=file_path,\n", + " na_rep=\"\",\n", + " index=False,\n", + " )\n", + "\n", " markdown_content = get_comparison_markdown_content(\n", " original_df=df_with_only_shared_states,\n", " comparison_df=comparison_df,\n", @@ -810,63 +1128,13 @@ { "cell_type": "code", "execution_count": null, - "id": "48d9bf6b", + "id": "9b8b6d1e", "metadata": { "scrolled": true }, "outputs": [], "source": [ "# Actually execute the functions\n", - "\n", - "# # California only\n", - "# cal_df = merged_df[merged_df[GEOID_TRACT_FIELD_NAME].astype(str).str[0:2] == \"06\"]\n", - "# # cal_df = cal_df[0:1000]\n", - "# print(len(cal_df))\n", - "\n", - "census_block_group_indices = [\n", - " Index(\n", - " method_name=\"Score A\",\n", - " priority_communities_field=\"Score A (top 25th percentile)\",\n", - " other_census_tract_fields_to_keep=[],\n", - " ),\n", - " # Index(\n", - " # method_name=\"Score B\",\n", - " # priority_communities_field=\"Score B (top 25th percentile)\",\n", - " # other_census_tract_fields_to_keep=[],\n", - " # ),\n", - " Index(\n", - " method_name=\"Score C\",\n", - " priority_communities_field=\"Score C (top 25th percentile)\",\n", - " other_census_tract_fields_to_keep=[],\n", - " ),\n", - " Index(\n", - " method_name=\"Score D\",\n", - " priority_communities_field=\"Score D (top 25th percentile)\",\n", - " other_census_tract_fields_to_keep=[],\n", - " ),\n", - " # Index(\n", - " # method_name=\"Score E\",\n", - " # priority_communities_field=\"Score E (top 25th percentile)\",\n", - " # other_census_tract_fields_to_keep=[],\n", - " # ),\n", - "]\n", - "\n", - "census_tract_indices = [\n", - " Index(\n", - " method_name=\"CalEnviroScreen 4.0\",\n", - " priority_communities_field=\"calenviroscreen_priority_community\",\n", - " other_census_tract_fields_to_keep=[\n", - " CALENVIROSCREEN_SCORE_FIELD,\n", - " CALENVIROSCREEN_PERCENTILE_FIELD,\n", - " ],\n", - " ),\n", - " Index(\n", - " method_name=\"HUD RECAP\",\n", - " priority_communities_field=\"hud_recap_priority_community\",\n", - " other_census_tract_fields_to_keep=[],\n", - " ),\n", - "]\n", - "\n", "file_paths = execute_comparisons(\n", " df=merged_df,\n", " census_block_group_indices=census_block_group_indices,\n", diff --git a/data/data-pipeline/utils.py b/data/data-pipeline/utils.py index 4ad2d985..160da421 100644 --- a/data/data-pipeline/utils.py +++ b/data/data-pipeline/utils.py @@ -133,7 +133,6 @@ def unzip_file_from_url( # cleanup temporary file os.remove(zip_file_path) - def data_folder_cleanup() -> None: """Remove all files and directories from the local data/dataset path""" @@ -161,7 +160,6 @@ def temp_folder_cleanup() -> None: logger.info(f"Initializing all temp directoriees") remove_all_from_dir(data_path / "tmp") - def get_excel_column_name(index: int) -> str: """Map a numeric index to the appropriate column in Excel. E.g., column #95 is "CR". Only works for the first 1000 columns.