diff --git a/data/data-pipeline/data_pipeline/application.py b/data/data-pipeline/data_pipeline/application.py index cde48ae1..88c2505b 100644 --- a/data/data-pipeline/data_pipeline/application.py +++ b/data/data-pipeline/data_pipeline/application.py @@ -33,6 +33,30 @@ dataset_cli_help = "Grab the data from either 'local' for local access or 'aws' LOG_LINE_WIDTH = 60 +use_cache_option = click.option( + "-u", + "--use-cache", + is_flag=True, + default=False, + help="Check if data source has been downloaded already, and if it has, use the cached version of the data source.", +) + +dataset_option = click.option( + "-d", + "--dataset", + required=False, + type=str, + help=dataset_cli_help, +) + +data_source_option = click.option( + "-s", + "--data-source", + default="local", + required=False, + type=str, + help=dataset_cli_help, +) @click.group() def cli(): @@ -51,7 +75,6 @@ def census_cleanup(): census_reset(data_path) log_goodbye() - sys.exit() @cli.command(help="Clean up all data folders") @@ -70,7 +93,6 @@ def data_cleanup(): geo_score_folder_cleanup() log_goodbye() - sys.exit() @cli.command( @@ -82,13 +104,7 @@ def data_cleanup(): is_flag=True, help="Upload to AWS S3 a zipped archive of the census data.", ) -@click.option( - "-u", - "--use-cache", - is_flag=True, - default=False, - help="Check if data source has been downloaded already, and if it has, use the cached version of the data source.", -) +@use_cache_option def census_data_download(zip_compress, use_cache): """CLI command to download all census shape files from the Census FTP and extract the geojson to generate national and by state Census Block Group CSVs""" @@ -105,18 +121,10 @@ def census_data_download(zip_compress, use_cache): zip_census_data() log_goodbye() - sys.exit() @cli.command(help="Retrieve census data from source") -@click.option( - "-s", - "--data-source", - default="local", - required=False, - type=str, - help=dataset_cli_help, -) +@data_source_option def pull_census_data(data_source: str): log_title("Pull Census Data") @@ -126,26 +134,13 @@ def pull_census_data(data_source: str): check_census_data_source(data_path, data_source) log_goodbye() - sys.exit() @cli.command( help="Run all ETL processes or a specific one", ) -@click.option( - "-d", - "--dataset", - required=False, - type=str, - help=dataset_cli_help, -) -@click.option( - "-u", - "--use-cache", - is_flag=True, - default=False, - help="Check if data source has been downloaded already, and if it has, use the cached version of the data source.", -) +@dataset_option +@use_cache_option def etl_run(dataset: str, use_cache: bool): """Run a specific or all ETL processes @@ -161,7 +156,6 @@ def etl_run(dataset: str, use_cache: bool): etl_runner(dataset, use_cache) log_goodbye() - sys.exit() @cli.command( @@ -178,19 +172,12 @@ def score_run(): score_generate() log_goodbye() - sys.exit() @cli.command( help="Run ETL + Score Generation", ) -@click.option( - "-u", - "--use-cache", - is_flag=True, - default=False, - help="Check if data source has been downloaded already, and if it has, use the cached version of the data source.", -) +@use_cache_option def score_full_run(use_cache: bool): """CLI command to run ETL and generate the score in one command""" log_title("Score Full Run", "Run ETL and Generate Score (no tiles)") @@ -207,20 +194,12 @@ def score_full_run(use_cache: bool): score_generate() log_goodbye() - sys.exit() @cli.command( help="Run etl_score_post to create score csv, tile csv, and downloadable zip" ) -@click.option( - "-s", - "--data-source", - default="local", - required=False, - type=str, - help=dataset_cli_help, -) +@data_source_option def generate_score_post(data_source: str): """CLI command to generate score, tile, and downloadable files @@ -244,18 +223,10 @@ def generate_score_post(data_source: str): score_post(data_source) log_goodbye() - sys.exit() @cli.command(help="Generate GeoJSON files with scores baked in") -@click.option( - "-s", - "--data-source", - default="local", - required=False, - type=str, - help=dataset_cli_help, -) +@data_source_option def geo_score(data_source: str): """CLI command to combine score with GeoJSON data and generate low and high files @@ -280,7 +251,6 @@ def geo_score(data_source: str): score_geo(data_source=data_source) log_goodbye() - sys.exit() @cli.command( @@ -304,7 +274,6 @@ def generate_map_tiles(generate_tribal_layer): generate_tiles(data_path, generate_tribal_layer) log_goodbye() - sys.exit() @cli.command( @@ -316,21 +285,8 @@ def generate_map_tiles(generate_tribal_layer): is_flag=True, help="Check if data run has been run before, and don't run it if so.", ) -@click.option( - "-s", - "--data-source", - default="local", - required=False, - type=str, - help=dataset_cli_help, -) -@click.option( - "-u", - "--use-cache", - is_flag=True, - default=False, - help="Check if data source has been downloaded already, and if it has, use the cached version of the data source.", -) +@data_source_option +@use_cache_option def data_full_run(check: bool, data_source: str, use_cache: bool): """CLI command to run ETL, score, JSON combine and generate tiles in one command @@ -388,19 +344,12 @@ def data_full_run(check: bool, data_source: str, use_cache: bool): call(cmd, shell=True) log_goodbye() - sys.exit() @cli.command( help="Print data sources for all ETL processes (or a specific one)", ) -@click.option( - "-d", - "--dataset", - required=False, - type=str, - help=dataset_cli_help, -) +@dataset_option def print_data_sources(dataset: str): """Print data sources for all ETL processes (or a specific one) @@ -421,26 +370,13 @@ def print_data_sources(dataset: str): log_info(s) log_goodbye() - sys.exit() @cli.command( help="Fetch data sources for all ETL processes (or a specific one)", ) -@click.option( - "-d", - "--dataset", - required=False, - type=str, - help=dataset_cli_help, -) -@click.option( - "-u", - "--use-cache", - is_flag=True, - default=False, - help="Check if data source has been downloaded already, and if it has, use the cached version of the data source.", -) +@dataset_option +@use_cache_option def extract_data_sources(dataset: str, use_cache: bool): """Extract and cache data source(s) for all ETL processes (or a specific one) @@ -457,19 +393,12 @@ def extract_data_sources(dataset: str, use_cache: bool): extract_ds(dataset, use_cache) log_goodbye() - sys.exit() @cli.command( help="Clear data source cache for all ETL processes (or a specific one)", ) -@click.option( - "-d", - "--dataset", - required=False, - type=str, - help=dataset_cli_help, -) +@dataset_option def clear_data_source_cache(dataset: str): """Clear data source(s) cache for all ETL processes (or a specific one) @@ -485,8 +414,32 @@ def clear_data_source_cache(dataset: str): clear_ds_cache(dataset) log_goodbye() - sys.exit() +@cli.command( + help="Generate scoring and tiles", +) +@click.pass_context +def full_post_etl(ctx): + """Generate scoring and tiles""" + ctx.invoke(score_run) + ctx.invoke(generate_score_post, data_source=None) + ctx.invoke(geo_score, data_source=None) + ctx.invoke(generate_map_tiles, generate_tribal_layer=False) + + +@cli.command( + help="Run all downloads, extracts, and generate scores and tiles", +) +@use_cache_option +@click.pass_context +def full_run(ctx, use_cache): + """Run all downloads, ETLs, and generate scores and tiles""" + if not use_cache: + ctx.invoke(data_cleanup) + ctx.invoke(census_data_download, zip_compress=False, use_cache=use_cache) + ctx.invoke(extract_data_sources, dataset=None, use_cache=use_cache) + ctx.invoke(etl_run, dataset=None, use_cache=use_cache) + ctx.invoke(full_post_etl) def log_title(title: str, subtitle: str = None): """Logs a title in our fancy title format""" diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index 0314512b..887d6189 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -472,6 +472,7 @@ class ScoreETL(ExtractTransformLoad): field_names.EXPECTED_POPULATION_LOSS_RATE_FIELD, field_names.CENSUS_DECENNIAL_HIGH_SCHOOL_ED_FIELD_2009, field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_100_FPL_FIELD_2009, + field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_200_FPL_FIELD_2009, field_names.CENSUS_DECENNIAL_UNEMPLOYMENT_FIELD_2009, field_names.CENSUS_UNEMPLOYMENT_FIELD_2010, field_names.CENSUS_POVERTY_LESS_THAN_100_FPL_FIELD_2010, diff --git a/data/data-pipeline/data_pipeline/score/field_names.py b/data/data-pipeline/data_pipeline/score/field_names.py index aed44c48..773a88aa 100644 --- a/data/data-pipeline/data_pipeline/score/field_names.py +++ b/data/data-pipeline/data_pipeline/score/field_names.py @@ -187,6 +187,9 @@ CENSUS_DECENNIAL_MEDIAN_INCOME_2009 = "Median household income in 2009 ($)" CENSUS_DECENNIAL_POVERTY_LESS_THAN_100_FPL_FIELD_2009 = ( "Percentage households below 100% of federal poverty line in 2009" ) +CENSUS_DECENNIAL_POVERTY_LESS_THAN_200_FPL_FIELD_2009 = ( + "Percentage households below 200% of federal poverty line in 2009" +) CENSUS_DECENNIAL_HIGH_SCHOOL_ED_FIELD_2009 = "Percent individuals age 25 or over with less than high school degree in 2009" CENSUS_DECENNIAL_UNEMPLOYMENT_FIELD_2009 = "Unemployment (percent) in 2009" CENSUS_DECENNIAL_TOTAL_POPULATION_FIELD_2009 = "Total population in 2009" @@ -226,6 +229,10 @@ COMBINED_POVERTY_LESS_THAN_100_FPL_FIELD_2010 = ( "Percentage households below 100% of federal poverty line in 2009 (island areas) " "and 2010 (states and PR)" ) +COMBINED_POVERTY_LESS_THAN_200_FPL_FIELD_2010 = ( + "Percentage households below 200% of federal poverty line in 2009 (island areas) " + "and 2010 (states and PR)" +) # Urban Rural Map URBAN_HEURISTIC_FIELD = "Urban Heuristic Flag" diff --git a/data/data-pipeline/data_pipeline/score/score_narwhal.py b/data/data-pipeline/data_pipeline/score/score_narwhal.py index 88473ca5..cfdab639 100644 --- a/data/data-pipeline/data_pipeline/score/score_narwhal.py +++ b/data/data-pipeline/data_pipeline/score/score_narwhal.py @@ -1013,6 +1013,47 @@ class ScoreNarwhal(Score): self.df[field_names.SCORE_N_COMMUNITIES], ) + def _mark_territory_dacs(self) -> None: + """Territory tracts that are flagged as low income are Score N communities. + """ + self.df[field_names.SCORE_N_COMMUNITIES] = np.where( + self.df[field_names.GEOID_TRACT_FIELD] + .str.startswith(tuple(constants.TILES_ISLAND_AREA_FIPS_CODES)) & + self.df[field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED], + True, + self.df[field_names.SCORE_N_COMMUNITIES], + ) + + def _mark_poverty_flag(self) -> None: + """Combine poverty less than 200% for territories and update the income flag.""" + # First we set the low income flag for non-territories by themselves, this + # way we don't change the original outcome if we include territories. + self.df[field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED] = ( + self.df[ + # UPDATE: Pull the imputed poverty statistic + field_names.POVERTY_LESS_THAN_200_FPL_IMPUTED_FIELD + + field_names.PERCENTILE_FIELD_SUFFIX + ] + >= self.LOW_INCOME_THRESHOLD + ) + + # Now we set the low income flag only for territories, but we need to rank them + # with all other tracts. + ( + self.df, + island_areas_poverty_200_criteria_field_name, + ) = self._combine_island_areas_with_states_and_set_thresholds( + df=self.df, + column_from_island_areas=field_names.CENSUS_DECENNIAL_POVERTY_LESS_THAN_200_FPL_FIELD_2009, + column_from_decennial_census=field_names.POVERTY_LESS_THAN_200_FPL_IMPUTED_FIELD, + combined_column_name=field_names.COMBINED_POVERTY_LESS_THAN_200_FPL_FIELD_2010, + threshold_cutoff_for_island_areas=self.LOW_INCOME_THRESHOLD, + ) + self.df.loc[self.df[field_names.GEOID_TRACT_FIELD].str.startswith(tuple(constants.TILES_ISLAND_AREA_FIPS_CODES)), + field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED] = ( + self.df[island_areas_poverty_200_criteria_field_name] >= self.LOW_INCOME_THRESHOLD + ) + def _get_percent_of_tract_that_is_dac(self) -> float: """Per the October 7th compromise (#1988), tracts can be partially DACs if some portion of the tract is tribal land. @@ -1034,14 +1075,7 @@ class ScoreNarwhal(Score): logger.debug("Adding Score Narhwal") self.df[field_names.THRESHOLD_COUNT] = 0 - self.df[field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED] = ( - self.df[ - # UPDATE: Pull the imputed poverty statistic - field_names.POVERTY_LESS_THAN_200_FPL_IMPUTED_FIELD - + field_names.PERCENTILE_FIELD_SUFFIX - ] - >= self.LOW_INCOME_THRESHOLD - ) + self._mark_poverty_flag() self.df[field_names.N_CLIMATE] = self._climate_factor() self.df[field_names.N_ENERGY] = self._energy_factor() @@ -1065,6 +1099,7 @@ class ScoreNarwhal(Score): self.df[field_names.CATEGORY_COUNT] = self.df[factors].sum(axis=1) self.df[field_names.SCORE_N_COMMUNITIES] = self.df[factors].any(axis=1) self._mark_tribal_dacs() + self._mark_territory_dacs() self.df[ field_names.SCORE_N_COMMUNITIES + field_names.PERCENTILE_FIELD_SUFFIX diff --git a/data/data-pipeline/data_pipeline/tests/score/test_score_narwhal_methods.py b/data/data-pipeline/data_pipeline/tests/score/test_score_narwhal_methods.py index 3132c51a..a559e46c 100644 --- a/data/data-pipeline/data_pipeline/tests/score/test_score_narwhal_methods.py +++ b/data/data-pipeline/data_pipeline/tests/score/test_score_narwhal_methods.py @@ -2,22 +2,20 @@ import pandas as pd import pytest from data_pipeline.config import settings +from data_pipeline.etl.score import constants from data_pipeline.etl.score.etl_score import ScoreETL from data_pipeline.score import field_names +from data_pipeline.score.score_narwhal import ScoreNarwhal from data_pipeline.utils import get_module_logger + logger = get_module_logger(__name__) +TEST_DATA_FOLDER = settings.APP_ROOT / "tests" / "score" / "test_utils" / "data" @pytest.fixture def toy_score_df(scope="module"): - return pd.read_csv( - settings.APP_ROOT - / "tests" - / "score" - / "test_utils" - / "data" - / "test_drop_tracts_from_percentile.csv", + return pd.read_csv(TEST_DATA_FOLDER / "test_drop_tracts_from_percentile.csv", dtype={field_names.GEOID_TRACT_FIELD: str}, ) @@ -83,3 +81,44 @@ def test_drop_all_tracts(toy_score_df): toy_score_df, drop_tracts=toy_score_df[field_names.GEOID_TRACT_FIELD].to_list(), ), "Percentile in score fails when we drop all tracts" + + +def test_mark_territory_dacs(): + test_data = pd.read_csv(TEST_DATA_FOLDER / "test_mark_territory_dacs.csv", + dtype={field_names.GEOID_TRACT_FIELD: str}, + ) + # Sanity check on the input data + assert not test_data[field_names.SCORE_N_COMMUNITIES].all() + + scorer = ScoreNarwhal(test_data) + scorer._mark_territory_dacs() + territory_filter = test_data[field_names.GEOID_TRACT_FIELD].str.startswith(tuple(constants.TILES_ISLAND_AREA_FIPS_CODES)) + # Check territories are set to true + expected_new_dacs_filter = ( + test_data[field_names.GEOID_TRACT_FIELD].isin(['60050951100', '66010951100', '69110001101', '78010990000']) + ) + assert test_data.loc[expected_new_dacs_filter, field_names.SCORE_N_COMMUNITIES].all() + # Non-territories are still false + assert not test_data.loc[~expected_new_dacs_filter, field_names.SCORE_N_COMMUNITIES].all() + + +def test_mark_poverty_flag(): + test_data = pd.read_csv(TEST_DATA_FOLDER / "test_mark_poverty_flag.csv", + dtype={field_names.GEOID_TRACT_FIELD: str}, + ) + # Sanity check on the input data + assert not test_data[field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED].all() + + scorer = ScoreNarwhal(test_data) + scorer._mark_poverty_flag() + expected_low_income_filter = ( + test_data[field_names.GEOID_TRACT_FIELD].isin(['36087011302', '66010951100', '78010990000']) + ) + # Three tracts are set to true + assert ( + test_data[expected_low_income_filter][field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED].all() + ) + # Everything else is false + assert ( + not test_data[~expected_low_income_filter][field_names.FPL_200_SERIES_IMPUTED_AND_ADJUSTED].all() + ) diff --git a/data/data-pipeline/data_pipeline/tests/score/test_utils/data/test_mark_poverty_flag.csv b/data/data-pipeline/data_pipeline/tests/score/test_utils/data/test_mark_poverty_flag.csv new file mode 100644 index 00000000..00ca70fd --- /dev/null +++ b/data/data-pipeline/data_pipeline/tests/score/test_utils/data/test_mark_poverty_flag.csv @@ -0,0 +1,8 @@ +GEOID10_TRACT,Percentage households below 200% of federal poverty line in 2009,"Percent of individuals below 200% Federal Poverty Line, imputed and adjusted","Percent of individuals below 200% Federal Poverty Line, imputed and adjusted (percentile)",Is low income (imputed and adjusted)? +01071950300,,0.1,0.1,False +36087011302,,0.7,0.7,False +72119130701,,0.5,0.5,False +60050951100,0.1,,,False +66010951100,0.7,,,False +69110001100,0.5,,,False +78010990000,0.9,,,False \ No newline at end of file diff --git a/data/data-pipeline/data_pipeline/tests/score/test_utils/data/test_mark_territory_dacs.csv b/data/data-pipeline/data_pipeline/tests/score/test_utils/data/test_mark_territory_dacs.csv new file mode 100644 index 00000000..84992e2a --- /dev/null +++ b/data/data-pipeline/data_pipeline/tests/score/test_utils/data/test_mark_territory_dacs.csv @@ -0,0 +1,9 @@ +GEOID10_TRACT,Is low income (imputed and adjusted)?,Definition N (communities) +01071950300,True,False +36087011302,False,False +72119130701,True,False +60050951100,True,False +66010951100,True,False +69110001100,False,False +69110001101,True,False +78010990000,True,False \ No newline at end of file diff --git a/data/data-pipeline/pyproject.toml b/data/data-pipeline/pyproject.toml index bbb9adc1..45403257 100644 --- a/data/data-pipeline/pyproject.toml +++ b/data/data-pipeline/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "justice40-data-pipeline" -version = "0.1.0" +version = "1.0.1" description = "ETL, Score and Map Generation of Justice 40 Tool" authors = ["Justice40 Engineering "] keywords = ["justice40", "environmental_justice", "python", "etl"]