In [None]:
# Before running this script as it currently stands, you'll need to run these notebooks (in any order):
# * score_calc.ipynb
# * calenviroscreen_etl.ipynb
# * hud_recap_etl.ipynb

import collections
import functools
import IPython
import itertools
import numpy as np
import os
import pandas as pd
import pathlib
import pypandoc
import requests
import string
import sys
import typing
import us
import zipfile

from datetime import datetime
from tqdm.notebook import tqdm_notebook

module_path = os.path.abspath(os.path.join(".."))
if module_path not in sys.path:
    sys.path.append(module_path)

from utils import remove_all_from_dir, get_excel_column_name
from etl.sources.census.etl_utils import get_state_information


# Turn on TQDM for pandas so that we can have progress bars when running `apply`.
tqdm_notebook.pandas()

In [None]:
# Suppress scientific notation in pandas (this shows up for census tract IDs)
pd.options.display.float_format = "{:.2f}".format

# Set some global parameters
DATA_DIR = pathlib.Path.cwd().parent / "data"
TEMP_DATA_DIR = DATA_DIR / "tmp"
COMPARISON_OUTPUTS_DIR = DATA_DIR / "comparison_outputs"

# Make the dirs if they don't exist
TEMP_DATA_DIR.mkdir(parents=True, exist_ok=True)
COMPARISON_OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)

CEJST_PRIORITY_COMMUNITY_THRESHOLD = 0.75

# Name fields using variables. (This makes it easy to reference the same fields frequently without using strings
# and introducing the risk of misspelling the field name.)

GEOID_FIELD_NAME = "GEOID10"
GEOID_TRACT_FIELD_NAME = "GEOID10_TRACT"
GEOID_STATE_FIELD_NAME = "GEOID10_STATE"
COUNTRY_FIELD_NAME = "Country"
CENSUS_BLOCK_GROUP_POPULATION_FIELD = "Total population"

CEJST_SCORE_FIELD = "cejst_score"
CEJST_PERCENTILE_FIELD = "cejst_percentile"
CEJST_PRIORITY_COMMUNITY_FIELD = "cejst_priority_community"

# Define some suffixes
POPULATION_SUFFIX = " (priority population)"

In [None]:
# Load CEJST score data
cejst_data_path = DATA_DIR / "score" / "csv" / "full" / "usa.csv"
cejst_df = pd.read_csv(cejst_data_path, dtype={GEOID_FIELD_NAME: "string"})

# Create the CBG's Census Tract ID by dropping the last number from the FIPS CODE of the CBG.
# The CBG ID is the last one character.
# For more information, see https://www.census.gov/programs-surveys/geography/guidance/geo-identifiers.html.
cejst_df.loc[:, GEOID_TRACT_FIELD_NAME] = (
    cejst_df.loc[:, GEOID_FIELD_NAME].astype(str).str[:-1]
)

cejst_df.loc[:, GEOID_STATE_FIELD_NAME] = (
    cejst_df.loc[:, GEOID_FIELD_NAME].astype(str).str[0:2]
)

cejst_df.head()

In [None]:
# Load CalEnviroScreen 4.0
CALENVIROSCREEN_SCORE_FIELD = "calenviroscreen_score"
CALENVIROSCREEN_PERCENTILE_FIELD = "calenviroscreen_percentile"
CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD = "calenviroscreen_priority_community"

calenviroscreen_data_path = DATA_DIR / "dataset" / "calenviroscreen4" / "data06.csv"
calenviroscreen_df = pd.read_csv(
    calenviroscreen_data_path, dtype={GEOID_TRACT_FIELD_NAME: "string"}
)

# Convert priority community field to a bool.
calenviroscreen_df[CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD] = calenviroscreen_df[
    CALENVIROSCREEN_PRIORITY_COMMUNITY_FIELD
].astype(bool)

calenviroscreen_df.head()

In [None]:
# Load HUD data
hud_recap_data_path = DATA_DIR / "dataset" / "hud_recap" / "usa.csv"
hud_recap_df = pd.read_csv(
    hud_recap_data_path, dtype={GEOID_TRACT_FIELD_NAME: "string"}
)

hud_recap_df.head()

In [None]:
# Join all dataframes that use tracts
census_tract_dfs = [calenviroscreen_df, hud_recap_df]

census_tract_df = functools.reduce(
    lambda left, right: pd.merge(
        left=left, right=right, on=GEOID_TRACT_FIELD_NAME, how="outer"
    ),
    census_tract_dfs,
)

if census_tract_df[GEOID_TRACT_FIELD_NAME].str.len().unique() != [11]:
    raise ValueError("Some of the census tract data has the wrong length.")

if len(census_tract_df) > 74134:
    raise ValueError("Too many rows in the join.")

census_tract_df.head()

In [None]:
# Join tract indices and CEJST data.
# Note: we're joining on the census *tract*, so there will be multiple CBG entries joined to the same census tract row from CES,
# creating multiple rows of the same CES data.
merged_df = cejst_df.merge(
    census_tract_df,
    how="left",
    on=GEOID_TRACT_FIELD_NAME,
)


if len(merged_df) > 220333:
    raise ValueError("Too many rows in the join.")

merged_df.head()


# merged_df.to_csv(
#     path_or_buf=COMPARISON_OUTPUTS_DIR / "merged.csv", na_rep="", index=False
# )

In [None]:
# Define a namedtuple for indices.
Index = collections.namedtuple(
    typename="Index",
    field_names=[
        "method_name",
        "priority_communities_field",
        # Note: this field only used by indices defined at the census tract level.
        "other_census_tract_fields_to_keep",
    ],
)

# Define the indices used for CEJST scoring (`census_block_group_indices`) as well as comparison
# (`census_tract_indices`).
census_block_group_indices = [
    Index(
        method_name="Score A",
        priority_communities_field="Score A (top 25th percentile)",
        other_census_tract_fields_to_keep=[],
    ),
    Index(
        method_name="Score B",
        priority_communities_field="Score B (top 25th percentile)",
        other_census_tract_fields_to_keep=[],
    ),
    Index(
        method_name="Score C",
        priority_communities_field="Score C (top 25th percentile)",
        other_census_tract_fields_to_keep=[],
    ),
    Index(
        method_name="Score D (25th percentile)",
        priority_communities_field="Score D (top 25th percentile)",
        other_census_tract_fields_to_keep=[],
    ),
    Index(
        method_name="Score D (30th percentile)",
        priority_communities_field="Score D (top 30th percentile)",
        other_census_tract_fields_to_keep=[],
    ),
    Index(
        method_name="Score D (35th percentile)",
        priority_communities_field="Score D (top 35th percentile)",
        other_census_tract_fields_to_keep=[],
    ),
    Index(
        method_name="Score D (40th percentile)",
        priority_communities_field="Score D (top 40th percentile)",
        other_census_tract_fields_to_keep=[],
    ),
    Index(
        method_name="Poverty",
        priority_communities_field="Poverty (Less than 200% of federal poverty line) (top 25th percentile)",
        other_census_tract_fields_to_keep=[],
    ),
]

census_tract_indices = [
    Index(
        method_name="CalEnviroScreen 4.0",
        priority_communities_field="calenviroscreen_priority_community",
        other_census_tract_fields_to_keep=[
            CALENVIROSCREEN_SCORE_FIELD,
            CALENVIROSCREEN_PERCENTILE_FIELD,
        ],
    ),
    Index(
        method_name="HUD RECAP",
        priority_communities_field="hud_recap_priority_community",
        other_census_tract_fields_to_keep=[],
    ),
]

In [None]:
def get_state_distributions(
    df: pd.DataFrame, priority_communities_fields: typing.List[str]
) -> pd.DataFrame:
    """For each boolean field of priority communities, calculate distribution across states and territories."""

    # Ensure each field is boolean.
    for priority_communities_field in priority_communities_fields:
        if df[priority_communities_field].dtype != bool:
            print(f"Converting {priority_communities_field} to boolean.")

        # Calculate the population included as priority communities per CBG. Will either be 0 or the population.
        df[f"{priority_communities_field}{POPULATION_SUFFIX}"] = (
            df[priority_communities_field] * df[CENSUS_BLOCK_GROUP_POPULATION_FIELD]
        )

    def calculate_state_comparison(
        frame: pd.DataFrame, geography_field: str
    ) -> pd.DataFrame:
        """
        This method will be applied to a `group_by` object. Inherits some parameters from outer scope.

        """
        summary_dict = {}
        summary_dict[COUNTRY_FIELD_NAME] = frame[COUNTRY_FIELD_NAME].unique()[0]

        if geography_field == COUNTRY_FIELD_NAME:
            summary_dict[GEOID_STATE_FIELD_NAME] = "00"
            summary_dict["Geography name"] = "(Entire USA)"

        if geography_field == GEOID_STATE_FIELD_NAME:
            state_id = frame[GEOID_STATE_FIELD_NAME].unique()[0]
            summary_dict[GEOID_STATE_FIELD_NAME] = state_id
            summary_dict["Geography name"] = us.states.lookup(state_id).name

            # Also add region information
            region_id = frame["region"].unique()[0]
            summary_dict["region"] = region_id

        if geography_field == "region":
            region_id = frame["region"].unique()[0]
            summary_dict["region"] = region_id
            summary_dict["Geography name"] = region_id

        if geography_field == "division":
            division_id = frame["division"].unique()[0]
            summary_dict["division"] = division_id
            summary_dict["Geography name"] = division_id

        summary_dict["Total CBGs in geography"] = len(frame)
        summary_dict["Total population in geography"] = frame[
            CENSUS_BLOCK_GROUP_POPULATION_FIELD
        ].sum()

        for priority_communities_field in priority_communities_fields:
            summary_dict[f"{priority_communities_field}{POPULATION_SUFFIX}"] = frame[
                f"{priority_communities_field}{POPULATION_SUFFIX}"
            ].sum()

            summary_dict[f"{priority_communities_field} (total CBGs)"] = frame[
                f"{priority_communities_field}"
            ].sum()

            # Calculate some combinations of other variables.
            summary_dict[f"{priority_communities_field} (percent CBGs)"] = (
                summary_dict[f"{priority_communities_field} (total CBGs)"]
                / summary_dict["Total CBGs in geography"]
            )

            summary_dict[f"{priority_communities_field} (percent population)"] = (
                summary_dict[f"{priority_communities_field}{POPULATION_SUFFIX}"]
                / summary_dict["Total population in geography"]
            )

        df = pd.DataFrame(summary_dict, index=[0])

        return df

    # Add a field for country so we can do aggregations across the entire country.
    df[COUNTRY_FIELD_NAME] = "USA"

    # First, run the comparison by the whole country
    usa_grouped_df = df.groupby(COUNTRY_FIELD_NAME)

    # Run the comparison function on the groups.
    usa_distribution_df = usa_grouped_df.progress_apply(
        lambda frame: calculate_state_comparison(
            frame, geography_field=COUNTRY_FIELD_NAME
        )
    )

    # Next, run the comparison by state
    state_grouped_df = df.groupby(GEOID_STATE_FIELD_NAME)

    # Run the comparison function on the groups.
    state_distribution_df = state_grouped_df.progress_apply(
        lambda frame: calculate_state_comparison(
            frame, geography_field=GEOID_STATE_FIELD_NAME
        )
    )

    # Next, run the comparison by region
    region_grouped_df = df.groupby("region")

    # Run the comparison function on the groups.
    region_distribution_df = region_grouped_df.progress_apply(
        lambda frame: calculate_state_comparison(frame, geography_field="region")
    )

    # Next, run the comparison by division
    division_grouped_df = df.groupby("division")

    # Run the comparison function on the groups.
    division_distribution_df = division_grouped_df.progress_apply(
        lambda frame: calculate_state_comparison(frame, geography_field="division")
    )

    # Combine the three
    combined_df = pd.concat(
        [
            usa_distribution_df,
            state_distribution_df,
            region_distribution_df,
            division_distribution_df,
        ]
    )

    return combined_df


def write_state_distribution_excel(
    state_distribution_df: pd.DataFrame, file_path: pathlib.PosixPath
) -> None:
    """Write the dataframe to excel with special formatting."""
    # Create a Pandas Excel writer using XlsxWriter as the engine.
    writer = pd.ExcelWriter(file_path, engine="xlsxwriter")

    # Convert the dataframe to an XlsxWriter Excel object. We also turn off the
    # index column at the left of the output dataframe.
    state_distribution_df.to_excel(writer, sheet_name="Sheet1", index=False)

    # Get the xlsxwriter workbook and worksheet objects.
    workbook = writer.book
    worksheet = writer.sheets["Sheet1"]
    worksheet.autofilter(
        0, 0, state_distribution_df.shape[0], state_distribution_df.shape[1]
    )

    # Set a width parameter for all columns
    # Note: this is parameterized because every call to `set_column` requires setting the width.
    column_width = 15

    for column in state_distribution_df.columns:
        # Turn the column index into excel ranges (e.g., column #95 is "CR" and the range may be "CR2:CR53").
        column_index = state_distribution_df.columns.get_loc(column)
        column_character = get_excel_column_name(column_index)

        # Set all columns to larger width
        worksheet.set_column(f"{column_character}:{column_character}", column_width)

        # Special formatting for all percent columns
        # Note: we can't just search for `percent`, because that's included in the word `percentile`.
        if "percent " in column or "(percent)" in column:
            # Make these columns percentages.
            percentage_format = workbook.add_format({"num_format": "0%"})
            worksheet.set_column(
                f"{column_character}:{column_character}",
                column_width,
                percentage_format,
            )

        # Special formatting for columns that capture the percent of population considered priority.
        if "(percent population)" in column:
            column_ranges = (
                f"{column_character}2:{column_character}{len(state_distribution_df)+1}"
            )

            # Add green to red conditional formatting.
            worksheet.conditional_format(
                column_ranges,
                # Min: green, max: red.
                {
                    "type": "2_color_scale",
                    "min_color": "#00FF7F",
                    "max_color": "#C82538",
                },
            )

    header_format = workbook.add_format(
        {"bold": True, "text_wrap": True, "valign": "bottom"}
    )

    # Overwrite both the value and the format of each header cell
    # This is because xlsxwriter / pandas has a known bug where it can't wrap text for a dataframe.
    # See https://stackoverflow.com/questions/42562977/xlsxwriter-text-wrap-not-working.
    for col_num, value in enumerate(state_distribution_df.columns.values):
        worksheet.write(0, col_num, value, header_format)

    writer.save()


fields_to_analyze = [
    index.priority_communities_field
    for index in census_block_group_indices + census_tract_indices
]

state_fips_codes = get_state_information(DATA_DIR)

merged_with_state_information_df = merged_df.merge(
    right=state_fips_codes, left_on=GEOID_STATE_FIELD_NAME, right_on="fips"
)

state_distribution_df = get_state_distributions(
    df=merged_with_state_information_df,
    priority_communities_fields=fields_to_analyze,
)

state_distribution_df.to_csv(
    path_or_buf=COMPARISON_OUTPUTS_DIR / "Priority CBGs by state.csv",
    na_rep="",
    index=False,
)

write_state_distribution_excel(
    state_distribution_df=state_distribution_df,
    file_path=COMPARISON_OUTPUTS_DIR / "Priority CBGs by state.xlsx",
)

state_distribution_df.head()

In [None]:
def write_markdown_and_docx_content(
    markdown_content: str, file_dir: pathlib.PosixPath, file_name_without_extension: str
) -> pathlib.PosixPath:
    """Write Markdown content to both .md and .docx files."""
    # Set the file paths for both files.
    markdown_file_path = file_dir / f"{file_name_without_extension}.md"
    docx_file_path = file_dir / f"{file_name_without_extension}.docx"

    # Write the markdown content to file.
    with open(markdown_file_path, "w") as text_file:
        text_file.write(markdown_content)

    # Convert markdown file to Word doc.
    pypandoc.convert_file(
        source_file=str(markdown_file_path),
        to="docx",
        outputfile=str(docx_file_path),
        extra_args=[],
    )

    return docx_file_path


def get_markdown_comparing_census_block_group_indices(
    census_block_group_indices=typing.List[Index],
    df=pd.DataFrame,
    state_field=GEOID_STATE_FIELD_NAME,
) -> str:
    """Generate a Markdown string of analysis of multiple CBG indices."""
    count_field_name = "Count of CBGs"

    # List of all states/territories in their FIPS codes:
    state_ids = sorted(df[state_field].unique())
    state_names = ", ".join([us.states.lookup(state_id).name for state_id in state_ids])

    # Create markdown content for comparisons.
    markdown_content = f"""
# Comparing multiple indices at the census block group level
    
(This report was calculated on {datetime.today().strftime('%Y-%m-%d')}.)

This report compares the following indices: {", ".join([index.method_name for index in census_block_group_indices])}.

This report analyzes the following US states and territories: {state_names}.

"""

    for (index1, index2) in itertools.combinations(census_block_group_indices, 2):
        # Group all data by their different values on Priority Communities Field for Index1 vs Priority Communities Field for Index2.
        count_df = (
            df.groupby(
                [index1.priority_communities_field, index2.priority_communities_field]
            )[GEOID_FIELD_NAME]
            .count()
            .reset_index(name=count_field_name)
        )

        total_cbgs = count_df[count_field_name].sum()

        # Returns a series
        true_true_cbgs_series = count_df.loc[
            count_df[index1.priority_communities_field]
            & count_df[index2.priority_communities_field],
            count_field_name,
        ]
        true_false_cbgs_series = count_df.loc[
            count_df[index1.priority_communities_field]
            & ~count_df[index2.priority_communities_field],
            count_field_name,
        ]
        false_true_cbgs_series = count_df.loc[
            ~count_df[index1.priority_communities_field]
            & count_df[index2.priority_communities_field],
            count_field_name,
        ]
        false_false_cbgs_series = count_df.loc[
            ~count_df[index1.priority_communities_field]
            & ~count_df[index2.priority_communities_field],
            count_field_name,
        ]

        # Convert from series to a scalar value, including accounting for if no data exists for that pairing.
        true_true_cbgs = (
            true_true_cbgs_series.iloc[0] if len(true_true_cbgs_series) > 0 else 0
        )
        true_false_cbgs = (
            true_false_cbgs_series.iloc[0] if len(true_false_cbgs_series) > 0 else 0
        )
        false_true_cbgs = (
            false_true_cbgs_series.iloc[0] if len(false_true_cbgs_series) > 0 else 0
        )
        false_false_cbgs = (
            false_false_cbgs_series.iloc[0] if len(false_false_cbgs_series) > 0 else 0
        )

        markdown_content += (
            "*** \n\n"
            "There are "
            f"{true_true_cbgs} ({true_true_cbgs / total_cbgs:.0%}) "
            f"census block groups that are both {index1.method_name} priority communities and {index2.method_name} priority communities.\n\n"
            "There are "
            f"{true_false_cbgs} ({true_false_cbgs / total_cbgs:.0%}) "
            f"census block groups that are {index1.method_name} priority communities but not {index2.method_name} priority communities.\n\n"
            "There are "
            f"{false_true_cbgs} ({false_true_cbgs / total_cbgs:.0%}) "
            f"census block groups that are not {index1.method_name} priority communities but are {index2.method_name} priority communities.\n\n"
            "There are "
            f"{false_false_cbgs} ({false_false_cbgs / total_cbgs:.0%}) "
            f"census block groups that are neither {index1.method_name} priority communities nor {index2.method_name} priority communities.\n\n"
            "\n\n"
        )

    return markdown_content


def get_comparison_census_block_group_indices(
    census_block_group_indices=typing.List[Index],
    df=pd.DataFrame,
    state_field=GEOID_STATE_FIELD_NAME,
) -> pathlib.PosixPath:
    markdown_content = get_markdown_comparing_census_block_group_indices(
        census_block_group_indices=census_block_group_indices,
        df=merged_with_state_information_df,
    )

    comparison_docx_file_path = write_markdown_and_docx_content(
        markdown_content=markdown_content,
        file_dir=COMPARISON_OUTPUTS_DIR,
        file_name_without_extension=f"Comparison report - All CBG indices",
    )

    return comparison_docx_file_path


# Compare multiple scores at the CBG level
get_comparison_census_block_group_indices(
    census_block_group_indices=census_block_group_indices,
    df=merged_with_state_information_df,
)

In [None]:
# This cell defines a variety of comparison functions. It does not run them.

# Define a namedtuple for column names, which need to be shared between multiple parts of this comparison pipeline.
# Named tuples are useful here because they provide guarantees that for each instance, all properties are defined and
# can be accessed as properties (rather than as strings).

# Note: if you'd like to add a field used throughout the comparison process, add it in three places.
# For an example `new_field`,
# 1. in this namedtuple, add the field as a string in `field_names` (e.g., `field_names=[..., "new_field"])`)
# 2. in the function `get_comparison_field_names`, define how the field name should be created from input data
#     (e.g., `...new_field=f"New field compares {method_a_name} to {method_b_name}")
# 3. In the function `get_comparison_markdown_content`, add some reporting on the new field to the markdown content.
#     (e.g., `The statistics indicate that {calculation_based_on_new_field} percent of census tracts are different between scores.`)
ComparisonFieldNames = collections.namedtuple(
    typename="ComparisonFieldNames",
    field_names=[
        "any_tract_has_at_least_one_method_a_cbg",
        "method_b_tract_has_at_least_one_method_a_cbg",
        "method_b_tract_has_100_percent_method_a_cbg",
        "method_b_non_priority_tract_has_at_least_one_method_a_cbg",
        "method_b_non_priority_tract_has_100_percent_method_a_cbg",
    ],
)


def get_comparison_field_names(
    method_a_name: str,
    method_b_name: str,
) -> ComparisonFieldNames:
    comparison_field_names = ComparisonFieldNames(
        any_tract_has_at_least_one_method_a_cbg=(
            f"Any tract has at least one {method_a_name} Priority CBG?"
        ),
        method_b_tract_has_at_least_one_method_a_cbg=(
            f"{method_b_name} priority tract has at least one {method_a_name} CBG?"
        ),
        method_b_tract_has_100_percent_method_a_cbg=(
            f"{method_b_name} tract has 100% {method_a_name} priority CBGs?"
        ),
        method_b_non_priority_tract_has_at_least_one_method_a_cbg=(
            f"Non-priority {method_b_name} tract has at least one {method_a_name} priority CBG?"
        ),
        method_b_non_priority_tract_has_100_percent_method_a_cbg=(
            f"Non-priority {method_b_name} tract has 100% {method_a_name} priority CBGs?"
        ),
    )
    return comparison_field_names


def get_df_with_only_shared_states(
    df: pd.DataFrame,
    field_a: str,
    field_b: str,
    state_field=GEOID_STATE_FIELD_NAME,
) -> pd.DataFrame:
    """
    Useful for looking at shared geographies across two fields.

    For a data frame and two fields, return a data frame only for states where there are non-null
    values for both fields in that state (or territory).

    This is useful, for example, when running a comparison of CalEnviroScreen (only in California) against
    a draft score that's national, and returning only the data for California for the entire data frame.
    """
    field_a_states = df.loc[df[field_a].notnull(), state_field].unique()
    field_b_states = df.loc[df[field_b].notnull(), state_field].unique()

    shared_states = list(set(field_a_states) & set(field_b_states))

    df = df.loc[df[state_field].isin(shared_states), :]

    return df


def get_comparison_df(
    df: pd.DataFrame,
    method_a_priority_census_block_groups_field: str,
    method_b_priority_census_tracts_field: str,
    other_census_tract_fields_to_keep: typing.Optional[typing.List[str]],
    comparison_field_names: ComparisonFieldNames,
    output_dir: pathlib.PosixPath,
) -> None:
    """Produces a comparison report for any two given boolean columns representing priority fields.

    Args:
      df: a pandas dataframe including the data for this comparison.
      method_a_priority_census_block_groups_field: the name of a boolean column in `df`, such as the CEJST priority
        community field that defines communities at the level of census block groups (CBGs).
      method_b_priority_census_tracts_field: the name of a boolean column in `df`, such as the CalEnviroScreen priority
        community field that defines communities at the level of census tracts.
      other_census_tract_fields_to_keep (optional): a list of field names to preserve at the census tract level

    Returns:
      df: a pandas dataframe with one row with the results of this comparison
    """

    def calculate_comparison(frame: pd.DataFrame) -> pd.DataFrame:
        """
        This method will be applied to a `group_by` object.

        Note: It inherits from outer scope `method_a_priority_census_block_groups_field`, `method_b_priority_census_tracts_field`,
        and `other_census_tract_fields_to_keep`.
        """
        # Keep all the tract values at the Census Tract Level
        for field in other_census_tract_fields_to_keep:
            if len(frame[field].unique()) != 1:
                raise ValueError(
                    f"There are different values per CBG for field {field}."
                    "`other_census_tract_fields_to_keep` can only be used for fields at the census tract level."
                )

        df = frame.loc[
            frame.index[0],
            [
                GEOID_TRACT_FIELD_NAME,
                method_b_priority_census_tracts_field,
            ]
            + other_census_tract_fields_to_keep,
        ]

        # Convenience constant for whether the tract is or is not a method B priority community.
        is_a_method_b_priority_tract = frame.loc[
            frame.index[0], [method_b_priority_census_tracts_field]
        ][0]

        # Recall that NaN values are not falsy, so we need to check if `is_a_method_b_priority_tract` is True.
        is_a_method_b_priority_tract = is_a_method_b_priority_tract is True

        # Calculate whether the tract (whether or not it is a comparison priority tract) includes CBGs that are priority
        # according to the current CBG score.
        df[comparison_field_names.any_tract_has_at_least_one_method_a_cbg] = (
            frame.loc[:, method_a_priority_census_block_groups_field].sum() > 0
        )

        # Calculate comparison
        # A comparison priority tract has at least one CBG that is a priority CBG.
        df[comparison_field_names.method_b_tract_has_at_least_one_method_a_cbg] = (
            frame.loc[:, method_a_priority_census_block_groups_field].sum() > 0
            if is_a_method_b_priority_tract
            else None
        )

        # A comparison priority tract has all of its contained CBGs as CBG priority CBGs.
        df[comparison_field_names.method_b_tract_has_100_percent_method_a_cbg] = (
            frame.loc[:, method_a_priority_census_block_groups_field].mean() == 1
            if is_a_method_b_priority_tract
            else None
        )

        # Calculate the inverse
        # A tract that is _not_ a comparison priority has at least one CBG priority CBG.
        df[
            comparison_field_names.method_b_non_priority_tract_has_at_least_one_method_a_cbg
        ] = (
            frame.loc[:, method_a_priority_census_block_groups_field].sum() > 0
            if not is_a_method_b_priority_tract
            else None
        )

        # A tract that is _not_ a comparison priority has all of its contained CBGs as CBG priority CBGs.
        df[
            comparison_field_names.method_b_non_priority_tract_has_100_percent_method_a_cbg
        ] = (
            frame.loc[:, method_a_priority_census_block_groups_field].mean() == 1
            if not is_a_method_b_priority_tract
            else None
        )

        # For all remaining fields, calculate the average
        # TODO: refactor to vectorize to make faster.
        for field in [
            "Poverty (Less than 200% of federal poverty line)",
            "Percent of households in linguistic isolation",
            "Percent individuals age 25 or over with less than high school degree",
            "Unemployed civilians (percent)",
        ]:
            df[f"{field} (average of CBGs)"] = frame.loc[:, field].mean()

        return df

    # Group all data by the census tract.
    grouped_df = df.groupby(GEOID_TRACT_FIELD_NAME)

    # Run the comparison function on the groups.
    comparison_df = grouped_df.progress_apply(calculate_comparison)

    return comparison_df


def get_comparison_markdown_content(
    original_df: pd.DataFrame,
    comparison_df: pd.DataFrame,
    comparison_field_names: ComparisonFieldNames,
    method_a_name: str,
    method_b_name: str,
    method_a_priority_census_block_groups_field: str,
    method_b_priority_census_tracts_field: str,
    state_field: str = GEOID_STATE_FIELD_NAME,
) -> str:
    # Prepare some constants for use in the following Markdown content.
    total_cbgs = len(original_df)

    # List of all states/territories in their FIPS codes:
    state_ids = sorted(original_df[state_field].unique())
    state_names = ", ".join([us.states.lookup(state_id).name for state_id in state_ids])

    # Note: using squeeze throughout do reduce result of `sum()` to a scalar.
    # TODO: investigate why sums are sometimes series and sometimes scalar.
    method_a_priority_cbgs = (
        original_df.loc[:, method_a_priority_census_block_groups_field].sum().squeeze()
    )
    method_a_priority_cbgs_percent = f"{method_a_priority_cbgs / total_cbgs:.0%}"

    total_tracts_count = len(comparison_df)

    method_b_priority_tracts_count = comparison_df.loc[
        :, method_b_priority_census_tracts_field
    ].sum()

    method_b_priority_tracts_count_percent = (
        f"{method_b_priority_tracts_count / total_tracts_count:.0%}"
    )
    method_b_non_priority_tracts_count = (
        total_tracts_count - method_b_priority_tracts_count
    )

    method_a_tracts_count = (
        comparison_df.loc[
            :, comparison_field_names.any_tract_has_at_least_one_method_a_cbg
        ]
        .sum()
        .squeeze()
    )
    method_a_tracts_count_percent = f"{method_a_tracts_count / total_tracts_count:.0%}"

    # Method A priority community stats
    method_b_tracts_with_at_least_one_method_a_cbg = comparison_df.loc[
        :, comparison_field_names.method_b_tract_has_at_least_one_method_a_cbg
    ].sum()
    method_b_tracts_with_at_least_one_method_a_cbg_percent = f"{method_b_tracts_with_at_least_one_method_a_cbg / method_b_priority_tracts_count:.0%}"

    method_b_tracts_with_at_100_percent_method_a_cbg = comparison_df.loc[
        :, comparison_field_names.method_b_tract_has_100_percent_method_a_cbg
    ].sum()
    method_b_tracts_with_at_100_percent_method_a_cbg_percent = f"{method_b_tracts_with_at_100_percent_method_a_cbg / method_b_priority_tracts_count:.0%}"

    # Method A non-priority community stats
    method_b_non_priority_tracts_with_at_least_one_method_a_cbg = comparison_df.loc[
        :,
        comparison_field_names.method_b_non_priority_tract_has_at_least_one_method_a_cbg,
    ].sum()

    method_b_non_priority_tracts_with_at_least_one_method_a_cbg_percent = f"{method_b_non_priority_tracts_with_at_least_one_method_a_cbg / method_b_non_priority_tracts_count:.0%}"

    method_b_non_priority_tracts_with_100_percent_method_a_cbg = comparison_df.loc[
        :,
        comparison_field_names.method_b_non_priority_tract_has_100_percent_method_a_cbg,
    ].sum()
    method_b_non_priority_tracts_with_100_percent_method_a_cbg_percent = f"{method_b_non_priority_tracts_with_100_percent_method_a_cbg / method_b_non_priority_tracts_count:.0%}"

    # Create markdown content for comparisons.
    markdown_content = f"""
# {method_a_name} compared to {method_b_name}

(This report was calculated on {datetime.today().strftime('%Y-%m-%d')}.)

This report analyzes the following US states and territories: {state_names}.

Recall that census tracts contain one or more census block groups, with up to nine census block groups per tract.

Within the geographic area analyzed, there are {method_b_priority_tracts_count} census tracts designated as priority communities by {method_b_name}, out of {total_tracts_count} total tracts ({method_b_priority_tracts_count_percent}). 

Within the geographic region analyzed, there are {method_a_priority_cbgs} census block groups considered as priority communities by {method_a_name}, out of {total_cbgs} CBGs ({method_a_priority_cbgs_percent}). They occupy {method_a_tracts_count} census tracts ({method_a_tracts_count_percent}) of the geographic area analyzed.

Out of every {method_b_name} priority census tract, {method_b_tracts_with_at_least_one_method_a_cbg} ({method_b_tracts_with_at_least_one_method_a_cbg_percent}) of these census tracts have at least one census block group within them that is considered a priority community by {method_a_name}.

Out of every {method_b_name} priority census tract, {method_b_tracts_with_at_100_percent_method_a_cbg} ({method_b_tracts_with_at_100_percent_method_a_cbg_percent}) of these census tracts have 100% of the included census block groups within them considered priority communities by {method_a_name}.

Out of every census tract that is __not__ marked as a priority community by {method_b_name}, {method_b_non_priority_tracts_with_at_least_one_method_a_cbg} ({method_b_non_priority_tracts_with_at_least_one_method_a_cbg_percent}) of these census tracts have at least one census block group within them that is considered a priority community by the current version of the CEJST score.

Out of every census tract that is __not__ marked as a priority community by {method_b_name}, {method_b_non_priority_tracts_with_100_percent_method_a_cbg} ({method_b_non_priority_tracts_with_100_percent_method_a_cbg_percent}) of these census tracts have 100% of the included census block groups within them considered priority communities by the current version of the CEJST score.
"""

    return markdown_content


def get_secondary_comparison_df(
    comparison_df: pd.DataFrame,
    comparison_field_names: ComparisonFieldNames,
    method_b_priority_census_tracts_field: str,
) -> pd.DataFrame:
    """A secondary level of comparison.

    The first level of comparison identifies census tracts prioritized by Method A,
    compared to whether or not they're prioritized by Method B.

    This comparison method analyzes characteristics of those census tracts, based on whether or not they are prioritized
    or not by Method A and/or Method B.


    E.g., it might show that tracts prioritized by A but not B have a higher average income,
    or that tracts prioritized by B but not A have a lower percent of unemployed people."""
    grouped_df = comparison_df.groupby(
        [
            method_b_priority_census_tracts_field,
            comparison_field_names.method_b_tract_has_at_least_one_method_a_cbg,
            comparison_field_names.method_b_non_priority_tract_has_at_least_one_method_a_cbg,
        ],
        dropna=False,
    )

    # Run the comparison function on the groups.
    secondary_comparison_df = grouped_df.mean().reset_index()

    return secondary_comparison_df


def execute_comparison(
    df: pd.DataFrame,
    method_a_name: str,
    method_b_name: str,
    method_a_priority_census_block_groups_field: str,
    method_b_priority_census_tracts_field: str,
    other_census_tract_fields_to_keep: typing.Optional[typing.List[str]],
) -> pathlib.PosixPath:
    """Execute an individual comparison by creating the data frame and writing the report.

    Args:
      df: a pandas dataframe including the data for this comparison.
      method_a_priority_census_block_groups_field: the name of a boolean column in `df`, such as the CEJST priority
        community field that defines communities at the level of census block groups (CBGs).
      method_b_priority_census_tracts_field: the name of a boolean column in `df`, such as the CalEnviroScreen priority
        community field that defines communities at the level of census tracts.
      other_census_tract_fields_to_keep (optional): a list of field names to preserve at the census tract level

    Returns:
      df: a pandas dataframe with one row with the results of this comparison

    """
    comparison_field_names = get_comparison_field_names(
        method_a_name=method_a_name, method_b_name=method_b_name
    )

    # Create or use a directory for outputs grouped by Method A.
    output_dir = COMPARISON_OUTPUTS_DIR / method_a_name
    output_dir.mkdir(parents=True, exist_ok=True)

    df_with_only_shared_states = get_df_with_only_shared_states(
        df=df,
        field_a=method_a_priority_census_block_groups_field,
        field_b=method_b_priority_census_tracts_field,
    )

    comparison_df = get_comparison_df(
        df=df_with_only_shared_states,
        method_a_priority_census_block_groups_field=method_a_priority_census_block_groups_field,
        method_b_priority_census_tracts_field=method_b_priority_census_tracts_field,
        comparison_field_names=comparison_field_names,
        other_census_tract_fields_to_keep=other_census_tract_fields_to_keep,
        output_dir=output_dir,
    )

    # Write comparison to CSV.
    file_path = (
        output_dir / f"Comparison Output - {method_a_name} and {method_b_name}.csv"
    )
    comparison_df.to_csv(
        path_or_buf=file_path,
        na_rep="",
        index=False,
    )

    # Secondary comparison DF
    secondary_comparison_df = get_secondary_comparison_df(
        comparison_df=comparison_df,
        comparison_field_names=comparison_field_names,
        method_b_priority_census_tracts_field=method_b_priority_census_tracts_field,
    )

    # Write secondary comparison to CSV.
    file_path = (
        output_dir
        / f"Secondary Comparison Output - {method_a_name} and {method_b_name}.csv"
    )
    secondary_comparison_df.to_csv(
        path_or_buf=file_path,
        na_rep="",
        index=False,
    )

    markdown_content = get_comparison_markdown_content(
        original_df=df_with_only_shared_states,
        comparison_df=comparison_df,
        comparison_field_names=comparison_field_names,
        method_a_name=method_a_name,
        method_b_name=method_b_name,
        method_a_priority_census_block_groups_field=method_a_priority_census_block_groups_field,
        method_b_priority_census_tracts_field=method_b_priority_census_tracts_field,
    )

    comparison_docx_file_path = write_markdown_and_docx_content(
        markdown_content=markdown_content,
        file_dir=output_dir,
        file_name_without_extension=f"Comparison report - {method_a_name} and {method_b_name}",
    )

    return comparison_docx_file_path


def execute_comparisons(
    df: pd.DataFrame,
    census_block_group_indices: typing.List[Index],
    census_tract_indices: typing.List[Index],
):
    """Create multiple comparison reports."""
    comparison_docx_file_paths = []
    for cbg_index in census_block_group_indices:
        for census_tract_index in census_tract_indices:
            print(
                f"Running comparisons for {cbg_index.method_name} against {census_tract_index.method_name}..."
            )

            comparison_docx_file_path = execute_comparison(
                df=df,
                method_a_name=cbg_index.method_name,
                method_b_name=census_tract_index.method_name,
                method_a_priority_census_block_groups_field=cbg_index.priority_communities_field,
                method_b_priority_census_tracts_field=census_tract_index.priority_communities_field,
                other_census_tract_fields_to_keep=census_tract_index.other_census_tract_fields_to_keep,
            )

            comparison_docx_file_paths.append(comparison_docx_file_path)

    return comparison_docx_file_paths

In [None]:
# Actually execute the functions
file_paths = execute_comparisons(
    df=merged_df,
    census_block_group_indices=census_block_group_indices,
    census_tract_indices=census_tract_indices,
)

print(file_paths)