diff --git a/data/data-pipeline/data_pipeline/etl/base.py b/data/data-pipeline/data_pipeline/etl/base.py index a0128119..8350e28a 100644 --- a/data/data-pipeline/data_pipeline/etl/base.py +++ b/data/data-pipeline/data_pipeline/etl/base.py @@ -21,6 +21,8 @@ class ExtractTransformLoad: TMP_PATH: Path = DATA_PATH / "tmp" GEOID_FIELD_NAME: str = "GEOID10" GEOID_TRACT_FIELD_NAME: str = "GEOID10_TRACT" + # TODO: investigate. Census says there are only 217,740 CBGs in the US. + EXPECTED_MAX_CENSUS_BLOCK_GROUPS: int = 220405 def get_yaml_config(self) -> None: """Reads the YAML configuration file for the dataset and stores diff --git a/data/data-pipeline/data_pipeline/etl/constants.py b/data/data-pipeline/data_pipeline/etl/constants.py index c417ea7b..1dee51ee 100644 --- a/data/data-pipeline/data_pipeline/etl/constants.py +++ b/data/data-pipeline/data_pipeline/etl/constants.py @@ -44,6 +44,11 @@ DATASET_LIST = [ "module_dir": "national_risk_index", "class_name": "NationalRiskIndexETL", }, + { + "name": "census_acs_median_income", + "module_dir": "census_acs_median_income", + "class_name": "CensusACSMedianIncomeETL", + }, ] CENSUS_INFO = { "name": "census", diff --git a/data/data-pipeline/data_pipeline/etl/score/constants.py b/data/data-pipeline/data_pipeline/etl/score/constants.py index f63e16b4..054ef025 100644 --- a/data/data-pipeline/data_pipeline/etl/score/constants.py +++ b/data/data-pipeline/data_pipeline/etl/score/constants.py @@ -39,7 +39,9 @@ DATA_SCORE_TILES_FILE_PATH = DATA_SCORE_TILES_DIR / "usa.csv" SCORE_DOWNLOADABLE_DIR = DATA_SCORE_DIR / "downloadable" SCORE_DOWNLOADABLE_CSV_FILE_PATH = SCORE_DOWNLOADABLE_DIR / "usa.csv" SCORE_DOWNLOADABLE_EXCEL_FILE_PATH = SCORE_DOWNLOADABLE_DIR / "usa.xlsx" -SCORE_DOWNLOADABLE_ZIP_FILE_PATH = SCORE_DOWNLOADABLE_DIR / "Screening_Tool_Data.zip" +SCORE_DOWNLOADABLE_ZIP_FILE_PATH = ( + SCORE_DOWNLOADABLE_DIR / "Screening_Tool_Data.zip" +) # Column subsets CENSUS_COUNTIES_COLUMNS = ["USPS", "GEOID", "NAME"] diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index c15f045b..1ede5a5a 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -31,10 +31,28 @@ class ScoreETL(ExtractTransformLoad): "Poverty (Less than 200% of federal poverty line)" ) self.HIGH_SCHOOL_FIELD_NAME = "Percent individuals age 25 or over with less than high school degree" + self.STATE_MEDIAN_INCOME_FIELD_NAME: str = ( + "Median household income (State; 2019 inflation-adjusted dollars)" + ) + self.MEDIAN_INCOME_FIELD_NAME = ( + "Median household income in the past 12 months" + ) self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME = ( "Median household income (% of state median household income)" ) + # Note: these variable names are slightly different (missing the word `PERCENT`) than those in the source ETL to avoid pylint's duplicate + # code error. - LMB + self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME = ( + "Percent of individuals < 100% Federal Poverty Line" + ) + self.POVERTY_LESS_THAN_150_FPL_FIELD_NAME = ( + "Percent of individuals < 150% Federal Poverty Line" + ) + self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME = ( + "Percent of individuals < 200% Federal Poverty Line" + ) + # There's another aggregation level (a second level of "buckets"). self.AGGREGATION_POLLUTION = "Pollution Burden" self.AGGREGATION_POPULATION = "Population Characteristics" @@ -51,6 +69,7 @@ class ScoreETL(ExtractTransformLoad): self.housing_and_transportation_df: pd.DataFrame self.hud_housing_df: pd.DataFrame self.cdc_places_df: pd.DataFrame + self.census_acs_median_incomes_df: pd.DataFrame def data_sets(self) -> list: # Define a named tuple that will be used for each data set input. @@ -112,6 +131,21 @@ class ScoreETL(ExtractTransformLoad): renamed_field="Physical health not good for >=14 days among adults aged >=18 years", bucket=None, ), + DataSet( + input_field=self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME, + renamed_field=self.POVERTY_LESS_THAN_100_FPL_FIELD_NAME, + bucket=None, + ), + DataSet( + input_field=self.POVERTY_LESS_THAN_150_FPL_FIELD_NAME, + renamed_field=self.POVERTY_LESS_THAN_150_FPL_FIELD_NAME, + bucket=None, + ), + DataSet( + input_field=self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME, + renamed_field=self.POVERTY_LESS_THAN_200_FPL_FIELD_NAME, + bucket=None, + ), # The following data sets have buckets, because they're used in Score C DataSet( input_field="CANCER", @@ -211,6 +245,7 @@ class ScoreETL(ExtractTransformLoad): ] def extract(self) -> None: + logger.info("Loading data sets from disk.") # EJSCreen csv Load ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv" self.ejscreen_df = pd.read_csv( @@ -257,6 +292,19 @@ class ScoreETL(ExtractTransformLoad): low_memory=False, ) + # Load census AMI data + census_acs_median_incomes_csv = ( + self.DATA_PATH + / "dataset" + / "census_acs_median_income_2019" + / "usa.csv" + ) + self.census_acs_median_incomes_df = pd.read_csv( + census_acs_median_incomes_csv, + dtype={self.GEOID_FIELD_NAME: "string"}, + low_memory=False, + ) + def _join_cbg_dfs(self, census_block_group_dfs: list) -> pd.DataFrame: logger.info("Joining Census Block Group dataframes") census_block_group_df = functools.reduce( @@ -275,7 +323,7 @@ class ScoreETL(ExtractTransformLoad): f"One of the input CSVs uses {self.GEOID_FIELD_NAME} with a different length." ) return census_block_group_df - + def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame: logger.info("Joining Census Tract dataframes") census_tract_df = functools.reduce( @@ -350,11 +398,10 @@ class ScoreETL(ExtractTransformLoad): # Multiply the "Pollution Burden" score and the "Population Characteristics" # together to produce the cumulative impact score. df["Score C"] = ( - df[self.AGGREGATION_POLLUTION] - * df[self.AGGREGATION_POPULATION] + df[self.AGGREGATION_POLLUTION] * df[self.AGGREGATION_POPULATION] ) return df - + def _add_scores_d_and_e(self, df: pd.DataFrame) -> pd.DataFrame: logger.info("Adding Scores D and E") fields_to_use_in_score = [ @@ -434,9 +481,7 @@ class ScoreETL(ExtractTransformLoad): ) | (df["Proximity to RMP sites (percentile)"] > 0.9) | ( - df[ - "Current asthma among adults aged >=18 years (percentile)" - ] + df["Current asthma among adults aged >=18 years (percentile)"] > 0.9 ) | ( @@ -476,15 +521,21 @@ class ScoreETL(ExtractTransformLoad): ) return df + def _add_score_g(self, df: pd.DataFrame) -> pd.DataFrame: + logger.info("Adding Score G") + # TODO: add scoring + return df + # TODO Move a lot of this to the ETL part of the pipeline def _prepare_initial_df(self, data_sets: list) -> pd.DataFrame: logger.info("Preparing initial dataframe") # Join all the data sources that use census block groups census_block_group_dfs = [ - self.ejscreen_df, - self.census_df, + self.ejscreen_df, + self.census_df, self.housing_and_transportation_df, + self.census_acs_median_incomes_df, ] census_block_group_df = self._join_cbg_dfs(census_block_group_dfs) @@ -504,11 +555,23 @@ class ScoreETL(ExtractTransformLoad): census_tract_df, on=self.GEOID_TRACT_FIELD_NAME ) - # If GEOID10s are read as numbers instead of strings, the initial 0 is dropped, + # If GEOID10s are read as numbers instead of strings, the initial 0 is dropped, # and then we get too many CBG rows (one for 012345 and one for 12345). - if len(census_block_group_df) > 220333: - raise ValueError("Too many rows in the join.") - + if len(census_block_group_df) > self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS: + raise ValueError( + f"Too many rows in the join: {len(census_block_group_df)}" + ) + + # Calculate median income variables. + # First, calculate the income of the block group as a fraction of the state income. + # TODO: handle null values for CBG median income, which are `-666666666`. + df[self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME] = ( + df[self.MEDIAN_INCOME_FIELD_NAME] + / df[self.STATE_MEDIAN_INCOME_FIELD_NAME] + ) + + # TODO: Calculate the income of the block group as a fraction of the AMI (either state or metropolitan, depending on reference). + # TODO Refactor to no longer use the data_sets list and do all renaming in ETL step # Rename columns: renaming_dict = { @@ -537,9 +600,9 @@ class ScoreETL(ExtractTransformLoad): # calculate percentiles for data_set in data_sets: - df[ - f"{data_set.renamed_field}{self.PERCENTILE_FIELD_SUFFIX}" - ] = df[data_set.renamed_field].rank(pct=True) + df[f"{data_set.renamed_field}{self.PERCENTILE_FIELD_SUFFIX}"] = df[ + data_set.renamed_field + ].rank(pct=True) # Do some math: # ( @@ -567,14 +630,14 @@ class ScoreETL(ExtractTransformLoad): df[f"{data_set.renamed_field}{self.MIN_MAX_FIELD_SUFFIX}"] = ( df[data_set.renamed_field] - min_value ) / (max_value - min_value) - + return df def transform(self) -> None: ## IMPORTANT: THIS METHOD IS CLOSE TO THE LIMIT OF STATEMENTS logger.info("Transforming Score Data") - + # get data sets list data_sets = self.data_sets() @@ -600,9 +663,10 @@ class ScoreETL(ExtractTransformLoad): # Calculate "Score F", which uses "either/or" thresholds. self.df = self._add_score_f(self.df) + # Calculate "Score G", which uses AMI and poverty. + self.df = self._add_score_g(self.df) + def load(self) -> None: logger.info("Saving Score CSV") - - # write nationwide csv self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True) self.df.to_csv(self.SCORE_CSV_PATH / "usa.csv", index=False) diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py b/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py index e5e77f0b..c5f9af5a 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py @@ -57,11 +57,15 @@ class PostScoreETL(ExtractTransformLoad): def _extract_score(self, score_path: Path) -> pd.DataFrame: logger.info("Reading Score CSV") - return pd.read_csv( - score_path, - dtype={"GEOID10": "string", "Total population": "int64"}, + df = pd.read_csv(score_path, dtype={"GEOID10": "string"}) + + # Convert total population to an int: + df["Total population"] = df["Total population"].astype( + int, errors="ignore" ) + return df + def _extract_national_cbg(self, national_cbg_path: Path) -> pd.DataFrame: logger.info("Reading national CBG") return pd.read_csv( @@ -91,18 +95,23 @@ class PostScoreETL(ExtractTransformLoad): constants.DATA_CENSUS_CSV_FILE_PATH ) - def _transform_counties(self, initial_counties_df: pd.DataFrame) -> pd.DataFrame: + def _transform_counties( + self, initial_counties_df: pd.DataFrame + ) -> pd.DataFrame: """ Necessary modifications to the counties dataframe """ # Rename some of the columns to prepare for merge new_df = initial_counties_df[constants.CENSUS_COUNTIES_COLUMNS] new_df.rename( - columns={"USPS": "State Abbreviation", "NAME": "County Name"}, inplace=True + columns={"USPS": "State Abbreviation", "NAME": "County Name"}, + inplace=True, ) return new_df - def _transform_states(self, initial_states_df: pd.DataFrame) -> pd.DataFrame: + def _transform_states( + self, initial_states_df: pd.DataFrame + ) -> pd.DataFrame: """ Necessary modifications to the states dataframe """ @@ -174,7 +183,9 @@ class PostScoreETL(ExtractTransformLoad): def _create_tile_data( self, score_county_state_merged_df: pd.DataFrame ) -> pd.DataFrame: - score_tiles = score_county_state_merged_df[constants.TILES_SCORE_COLUMNS] + score_tiles = score_county_state_merged_df[ + constants.TILES_SCORE_COLUMNS + ] decimals = pd.Series( [constants.TILES_ROUND_NUM_DECIMALS] * len(constants.TILES_SCORE_FLOAT_COLUMNS), @@ -185,7 +196,9 @@ class PostScoreETL(ExtractTransformLoad): def _create_downloadable_data( self, score_county_state_merged_df: pd.DataFrame ) -> pd.DataFrame: - return score_county_state_merged_df[constants.DOWNLOADABLE_SCORE_COLUMNS] + return score_county_state_merged_df[ + constants.DOWNLOADABLE_SCORE_COLUMNS + ] def transform(self) -> None: logger.info("Transforming data sources for Score + County CSV") @@ -206,7 +219,9 @@ class PostScoreETL(ExtractTransformLoad): self.output_downloadable_df = self._create_downloadable_data( output_score_county_state_merged_df ) - self.output_score_county_state_merged_df = output_score_county_state_merged_df + self.output_score_county_state_merged_df = ( + output_score_county_state_merged_df + ) def _load_score_csv( self, score_county_state_merged: pd.DataFrame, score_csv_path: Path diff --git a/data/data-pipeline/data_pipeline/etl/score/tests/conftest.py b/data/data-pipeline/data_pipeline/etl/score/tests/conftest.py index b48bbfd7..c1a1c4af 100644 --- a/data/data-pipeline/data_pipeline/etl/score/tests/conftest.py +++ b/data/data-pipeline/data_pipeline/etl/score/tests/conftest.py @@ -83,7 +83,9 @@ def states_transformed_expected(): return pd.DataFrame.from_dict( data={ "State Code": pd.Series(["01", "02", "04"], dtype="string"), - "State Name": pd.Series(["Alabama", "Alaska", "Arizona"], dtype="object"), + "State Name": pd.Series( + ["Alabama", "Alaska", "Arizona"], dtype="object" + ), "State Abbreviation": pd.Series(["AL", "AK", "AZ"], dtype="string"), }, ) @@ -91,14 +93,18 @@ def states_transformed_expected(): @pytest.fixture() def score_transformed_expected(): - return pd.read_pickle(pytest.SNAPSHOT_DIR / "score_transformed_expected.pkl") + return pd.read_pickle( + pytest.SNAPSHOT_DIR / "score_transformed_expected.pkl" + ) @pytest.fixture() def national_cbg_df(): return pd.DataFrame.from_dict( data={ - "GEOID10": pd.Series(["010010201001", "010010201002"], dtype="string"), + "GEOID10": pd.Series( + ["010010201001", "010010201002"], dtype="string" + ), }, ) @@ -115,4 +121,6 @@ def tile_data_expected(): @pytest.fixture() def downloadable_data_expected(): - return pd.read_pickle(pytest.SNAPSHOT_DIR / "downloadable_data_expected.pkl") + return pd.read_pickle( + pytest.SNAPSHOT_DIR / "downloadable_data_expected.pkl" + ) diff --git a/data/data-pipeline/data_pipeline/etl/score/tests/test_score_post.py b/data/data-pipeline/data_pipeline/etl/score/tests/test_score_post.py index 89b358bd..22f8696a 100644 --- a/data/data-pipeline/data_pipeline/etl/score/tests/test_score_post.py +++ b/data/data-pipeline/data_pipeline/etl/score/tests/test_score_post.py @@ -33,16 +33,22 @@ def test_extract_score(etl, score_data_initial): # Transform Tests -def test_transform_counties(etl, county_data_initial, counties_transformed_expected): +def test_transform_counties( + etl, county_data_initial, counties_transformed_expected +): extracted_counties = etl._extract_counties(county_data_initial) counties_transformed_actual = etl._transform_counties(extracted_counties) - pdt.assert_frame_equal(counties_transformed_actual, counties_transformed_expected) + pdt.assert_frame_equal( + counties_transformed_actual, counties_transformed_expected + ) def test_transform_states(etl, state_data_initial, states_transformed_expected): extracted_states = etl._extract_states(state_data_initial) states_transformed_actual = etl._transform_states(extracted_states) - pdt.assert_frame_equal(states_transformed_actual, states_transformed_expected) + pdt.assert_frame_equal( + states_transformed_actual, states_transformed_expected + ) def test_transform_score(etl, score_data_initial, score_transformed_expected): @@ -82,8 +88,12 @@ def test_create_tile_data(etl, score_data_expected, tile_data_expected): ) -def test_create_downloadable_data(etl, score_data_expected, downloadable_data_expected): - output_downloadable_df_actual = etl._create_downloadable_data(score_data_expected) +def test_create_downloadable_data( + etl, score_data_expected, downloadable_data_expected +): + output_downloadable_df_actual = etl._create_downloadable_data( + score_data_expected + ) pdt.assert_frame_equal( output_downloadable_df_actual, downloadable_data_expected, @@ -101,7 +111,9 @@ def test_load_score_csv(etl, score_data_expected): def test_load_tile_csv(etl, tile_data_expected): reload(constants) - etl._load_score_csv(tile_data_expected, constants.DATA_SCORE_TILES_FILE_PATH) + etl._load_score_csv( + tile_data_expected, constants.DATA_SCORE_TILES_FILE_PATH + ) assert constants.DATA_SCORE_TILES_FILE_PATH.is_file() diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl.py index 26b6f834..052a6f36 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl.py @@ -4,7 +4,6 @@ import censusdata from data_pipeline.etl.base import ExtractTransformLoad from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes from data_pipeline.utils import get_module_logger -from data_pipeline.config import settings logger = get_module_logger(__name__) @@ -21,31 +20,38 @@ class CensusACSETL(ExtractTransformLoad): "Linguistic isolation (total)" ) self.LINGUISTIC_ISOLATION_FIELDS = [ - "C16002_001E", # Estimate!!Total - "C16002_004E", # Estimate!!Total!!Spanish!!Limited English speaking household - "C16002_007E", # Estimate!!Total!!Other Indo-European languages!!Limited English speaking household - "C16002_010E", # Estimate!!Total!!Asian and Pacific Island languages!!Limited English speaking household - "C16002_013E", # Estimate!!Total!!Other languages!!Limited English speaking household + "C16002_001E", # Estimate!!Total + "C16002_004E", # Estimate!!Total!!Spanish!!Limited English speaking household + "C16002_007E", # Estimate!!Total!!Other Indo-European languages!!Limited English speaking household + "C16002_010E", # Estimate!!Total!!Asian and Pacific Island languages!!Limited English speaking household + "C16002_013E", # Estimate!!Total!!Other languages!!Limited English speaking household ] self.MEDIAN_INCOME_FIELD = "B19013_001E" self.MEDIAN_INCOME_FIELD_NAME = ( "Median household income in the past 12 months" ) - self.MEDIAN_INCOME_STATE_FIELD_NAME = "Median household income (State)" - self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME = ( - "Median household income (% of state median household income)" + self.POVERTY_FIELDS = [ + "C17002_001E", # Estimate!!Total, + "C17002_002E", # Estimate!!Total!!Under .50 + "C17002_003E", # Estimate!!Total!!.50 to .99 + "C17002_004E", # Estimate!!Total!!1.00 to 1.24 + "C17002_005E", # Estimate!!Total!!1.25 to 1.49 + "C17002_006E", # Estimate!!Total!!1.50 to 1.84 + "C17002_007E", # Estimate!!Total!!1.85 to 1.99 + ] + + self.POVERTY_LESS_THAN_100_PERCENT_FPL_FIELD_NAME = ( + "Percent of individuals < 100% Federal Poverty Line" ) + self.POVERTY_LESS_THAN_150_PERCENT_FPL_FIELD_NAME = ( + "Percent of individuals < 150% Federal Poverty Line" + ) + self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME = ( + "Percent of individuals < 200% Federal Poverty Line" + ) + self.STATE_GEOID_FIELD_NAME = "GEOID2" self.df: pd.DataFrame - self.state_median_income_df: pd.DataFrame - - self.STATE_MEDIAN_INCOME_FTP_URL = ( - settings.AWS_JUSTICE40_DATASOURCES_URL - + "/2015_to_2019_state_median_income.zip" - ) - self.STATE_MEDIAN_INCOME_FILE_PATH = ( - self.TMP_PATH / "2015_to_2019_state_median_income.csv" - ) def _fips_from_censusdata_censusgeo( self, censusgeo: censusdata.censusgeo @@ -55,11 +61,6 @@ class CensusACSETL(ExtractTransformLoad): return fips def extract(self) -> None: - # Extract state median income - super().extract( - self.STATE_MEDIAN_INCOME_FTP_URL, - self.TMP_PATH, - ) dfs = [] for fips in get_state_fips_codes(self.DATA_PATH): logger.info( @@ -79,7 +80,8 @@ class CensusACSETL(ExtractTransformLoad): "B23025_003E", self.MEDIAN_INCOME_FIELD, ] - + self.LINGUISTIC_ISOLATION_FIELDS, + + self.LINGUISTIC_ISOLATION_FIELDS + + self.POVERTY_FIELDS, ) ) @@ -89,12 +91,6 @@ class CensusACSETL(ExtractTransformLoad): func=self._fips_from_censusdata_censusgeo ) - self.state_median_income_df = pd.read_csv( - # TODO: Replace with reading from S3. - filepath_or_buffer=self.STATE_MEDIAN_INCOME_FILE_PATH, - dtype={self.STATE_GEOID_FIELD_NAME: "string"}, - ) - def transform(self) -> None: logger.info("Starting Census ACS Transform") @@ -103,24 +99,6 @@ class CensusACSETL(ExtractTransformLoad): self.MEDIAN_INCOME_FIELD ] - # TODO: handle null values for CBG median income, which are `-666666666`. - - # Join state data on CBG data: - self.df[self.STATE_GEOID_FIELD_NAME] = ( - self.df[self.GEOID_FIELD_NAME].astype(str).str[0:2] - ) - self.df = self.df.merge( - self.state_median_income_df, - how="left", - on=self.STATE_GEOID_FIELD_NAME, - ) - - # Calculate the income of the block group as a fraction of the state income: - self.df[self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME] = ( - self.df[self.MEDIAN_INCOME_FIELD_NAME] - / self.df[self.MEDIAN_INCOME_STATE_FIELD_NAME] - ) - # Calculate percent unemployment. # TODO: remove small-sample data that should be `None` instead of a high-variance fraction. self.df[self.UNEMPLOYED_FIELD_NAME] = ( @@ -145,6 +123,27 @@ class CensusACSETL(ExtractTransformLoad): self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME].describe() + # Calculate percent at different poverty thresholds + self.df[self.POVERTY_LESS_THAN_100_PERCENT_FPL_FIELD_NAME] = ( + self.df["C17002_002E"] + self.df["C17002_003E"] + ) / self.df["C17002_001E"] + + self.df[self.POVERTY_LESS_THAN_150_PERCENT_FPL_FIELD_NAME] = ( + self.df["C17002_002E"] + + self.df["C17002_003E"] + + self.df["C17002_004E"] + + self.df["C17002_005E"] + ) / self.df["C17002_001E"] + + self.df[self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME] = ( + self.df["C17002_002E"] + + self.df["C17002_003E"] + + self.df["C17002_004E"] + + self.df["C17002_005E"] + + self.df["C17002_006E"] + + self.df["C17002_007E"] + ) / self.df["C17002_001E"] + def load(self) -> None: logger.info("Saving Census ACS Data") @@ -156,8 +155,9 @@ class CensusACSETL(ExtractTransformLoad): self.UNEMPLOYED_FIELD_NAME, self.LINGUISTIC_ISOLATION_FIELD_NAME, self.MEDIAN_INCOME_FIELD_NAME, - self.MEDIAN_INCOME_STATE_FIELD_NAME, - self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME, + self.POVERTY_LESS_THAN_100_PERCENT_FPL_FIELD_NAME, + self.POVERTY_LESS_THAN_150_PERCENT_FPL_FIELD_NAME, + self.POVERTY_LESS_THAN_200_PERCENT_FPL_FIELD_NAME, ] self.df[columns_to_include].to_csv( diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/README.md b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/README.md new file mode 100644 index 00000000..e69de29b diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/__init__.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py new file mode 100644 index 00000000..06cab81a --- /dev/null +++ b/data/data-pipeline/data_pipeline/etl/sources/census_acs_median_income/etl.py @@ -0,0 +1,276 @@ +import json +from pathlib import Path +import pandas as pd +import requests + +from data_pipeline.etl.base import ExtractTransformLoad +from data_pipeline.utils import get_module_logger +from data_pipeline.config import settings +from data_pipeline.utils import unzip_file_from_url + +logger = get_module_logger(__name__) + + +class CensusACSMedianIncomeETL(ExtractTransformLoad): + def __init__(self): + self.ACS_YEAR: int = 2019 + self.OUTPUT_PATH: Path = ( + self.DATA_PATH + / "dataset" + / f"census_acs_median_income_{self.ACS_YEAR}" + ) + + # Set constants for Geocorr MSAs data. + self.PLACE_FIELD_NAME: str = "Census Place Name" + self.COUNTY_FIELD_NAME: str = "County Name" + self.STATE_ABBREVIATION_FIELD_NAME: str = "State Abbreviation" + self.MSA_FIELD_NAME: str = ( + "Metropolitan/Micropolitan Statistical Area Name" + ) + self.MSA_ID_FIELD_NAME: str = "MSA ID" + self.MSA_TYPE_FIELD_NAME: str = "MSA Type" + + # Set constants for MSA median incomes + self.MSA_MEDIAN_INCOME_URL: str = ( + f"https://api.census.gov/data/{self.ACS_YEAR}/acs/acs5?get=B19013_001E" + + "&for=metropolitan%20statistical%20area/micropolitan%20statistical%20area" + ) + self.MSA_INCOME_FIELD_NAME: str = f"Median household income in the past 12 months (MSA; {self.ACS_YEAR} inflation-adjusted dollars)" + + # Set constants for state median incomes + self.STATE_MEDIAN_INCOME_URL: str = f"https://api.census.gov/data/{self.ACS_YEAR}/acs/acs5?get=B19013_001E&for=state" + self.STATE_GEOID_FIELD_NAME: str = "GEOID2" + self.STATE_MEDIAN_INCOME_FIELD_NAME: str = f"Median household income (State; {self.ACS_YEAR} inflation-adjusted dollars)" + + # Constants for output + self.AMI_REFERENCE_FIELD_NAME: str = "AMI Reference" + self.AMI_FIELD_NAME: str = "Area Median Income (State or metropolitan)" + self.COLUMNS_TO_KEEP = [ + self.GEOID_FIELD_NAME, + self.PLACE_FIELD_NAME, + self.COUNTY_FIELD_NAME, + self.STATE_ABBREVIATION_FIELD_NAME, + self.MSA_FIELD_NAME, + self.MSA_ID_FIELD_NAME, + self.MSA_TYPE_FIELD_NAME, + self.MSA_INCOME_FIELD_NAME, + self.STATE_GEOID_FIELD_NAME, + self.STATE_MEDIAN_INCOME_FIELD_NAME, + self.AMI_REFERENCE_FIELD_NAME, + self.AMI_FIELD_NAME, + ] + + # Remaining definitions + self.output_df: pd.DataFrame + self.raw_geocorr_df: pd.DataFrame + self.msa_median_incomes: dict + self.state_median_incomes: dict + + def _transform_geocorr(self) -> pd.DataFrame: + # Transform the geocorr data + geocorr_df = self.raw_geocorr_df + + # Strip the unnecessary period from the tract ID: + geocorr_df["tract"] = geocorr_df["tract"].str.replace( + ".", "", regex=False + ) + + # Create the full GEOID out of the component parts. + geocorr_df[self.GEOID_FIELD_NAME] = ( + geocorr_df["county"] + geocorr_df["tract"] + geocorr_df["bg"] + ) + + # QA the combined field: + tract_values = geocorr_df[self.GEOID_FIELD_NAME].str.len().unique() + if any(tract_values != [12]): + print(tract_values) + raise ValueError("Some of the census BG data has the wrong length.") + + # Rename some fields + geocorr_df.rename( + columns={ + "placenm": self.PLACE_FIELD_NAME, + "cbsaname10": self.MSA_FIELD_NAME, + "cntyname": self.COUNTY_FIELD_NAME, + "stab": self.STATE_ABBREVIATION_FIELD_NAME, + "cbsa10": self.MSA_ID_FIELD_NAME, + "cbsatype10": self.MSA_TYPE_FIELD_NAME, + }, + inplace=True, + errors="raise", + ) + + # Remove duplicated rows. + # Some rows appear twice: once for the population within a CBG that's also within a census place, + # and once for the population that's within a CBG that's *not* within a census place. + # Drop the row that's not within a census place. + + # Sort by whether the place has a place name: + geocorr_df.sort_values( + by=self.PLACE_FIELD_NAME, axis=0, ascending=True, inplace=True + ) + + # Drop all the duplicated rows except for the first one (which will have the place name): + rows_to_drop = geocorr_df.duplicated( + keep="first", subset=[self.GEOID_FIELD_NAME] + ) + + # Keep everything that's *not* a row to drop: + geocorr_df = geocorr_df[~rows_to_drop] + + # Sort by GEOID again to put the dataframe back to original order: + # Note: avoiding using inplace because of unusual `SettingWithCopyWarning` warning. + geocorr_df = geocorr_df.sort_values( + by=self.GEOID_FIELD_NAME, axis=0, ascending=True, inplace=False + ) + + if len(geocorr_df) > self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS: + raise ValueError("Too many CBGs.") + + return geocorr_df + + def _transform_msa_median_incomes(self) -> pd.DataFrame: + # Remove first list entry, which is the column names. + column_names = self.msa_median_incomes.pop(0) + + msa_median_incomes_df = pd.DataFrame( + data=self.msa_median_incomes, columns=column_names + ) + msa_median_incomes_df.rename( + columns={ + "B19013_001E": self.MSA_INCOME_FIELD_NAME, + "metropolitan statistical area/micropolitan statistical area": self.MSA_ID_FIELD_NAME, + }, + inplace=True, + errors="raise", + ) + + # Convert MSA ID to str + msa_median_incomes_df[self.MSA_ID_FIELD_NAME] = msa_median_incomes_df[ + self.MSA_ID_FIELD_NAME + ].astype(str) + + return msa_median_incomes_df + + def _transform_state_median_incomes(self) -> pd.DataFrame: + # Remove first list entry, which is the column names. + column_names = self.state_median_incomes.pop(0) + state_median_incomes_df = pd.DataFrame( + data=self.state_median_incomes, columns=column_names + ) + + state_median_incomes_df.rename( + columns={ + "B19013_001E": self.STATE_MEDIAN_INCOME_FIELD_NAME, + "state": self.STATE_GEOID_FIELD_NAME, + }, + inplace=True, + errors="raise", + ) + + return state_median_incomes_df + + def extract(self) -> None: + logger.info("Starting three separate downloads.") + # Load and clean GEOCORR data + # Note: this data is generated by https://mcdc.missouri.edu/applications/geocorr2014.html, at the advice of the Census. + # The specific query used is the following, which takes a couple of minutes to run: + # https://mcdc.missouri.edu/cgi-bin/broker?_PROGRAM=apps.geocorr2014.sas&_SERVICE=MCDC_long&_debug=0&state=Mo29&state=Al01&state=Ak02&state=Az04&state=Ar05&state=Ca06&state=Co08&state=Ct09&state=De10&state=Dc11&state=Fl12&state=Ga13&state=Hi15&state=Id16&state=Il17&state=In18&state=Ia19&state=Ks20&state=Ky21&state=La22&state=Me23&state=Md24&state=Ma25&state=Mi26&state=Mn27&state=Ms28&state=Mt30&state=Ne31&state=Nv32&state=Nh33&state=Nj34&state=Nm35&state=Ny36&state=Nc37&state=Nd38&state=Oh39&state=Ok40&state=Or41&state=Pa42&state=Ri44&state=Sc45&state=Sd46&state=Tn47&state=Tx48&state=Ut49&state=Vt50&state=Va51&state=Wa53&state=Wv54&state=Wi55&state=Wy56&g1_=state&g1_=county&g1_=placefp&g1_=tract&g1_=bg&g2_=cbsa10&g2_=cbsatype10&wtvar=pop10&nozerob=1&title=&csvout=1&namoptf=b&listout=1&lstfmt=html&namoptr=b&oropt=&counties=&metros=&places=&latitude=&longitude=&locname=&distance=&kiloms=0&nrings=&r1=&r2=&r3=&r4=&r5=&r6=&r7=&r8=&r9=&r10=&lathi=&latlo=&longhi=&longlo= + logger.info("Starting download of Geocorr information.") + + unzip_file_from_url( + file_url=settings.AWS_JUSTICE40_DATASOURCES_URL + + "/geocorr2014_all_states.csv.zip", + download_path=self.TMP_PATH, + unzipped_file_path=self.TMP_PATH / "geocorr", + ) + + self.raw_geocorr_df = pd.read_csv( + filepath_or_buffer=self.TMP_PATH + / "geocorr" + / "geocorr2014_all_states.csv", + # Skip second row, which has descriptions. + skiprows=[1], + # The following need to remain as strings for all of their digits, not get converted to numbers. + dtype={ + "tract": "string", + "county": "string", + "state": "string", + "bg": "string", + "cbsa10": "string", + }, + low_memory=False, + ) + + # Download MSA median incomes + logger.info("Starting download of MSA median incomes.") + download = requests.get(self.MSA_MEDIAN_INCOME_URL, verify=None) + self.msa_median_incomes = json.loads(download.content) + + # Download state median incomes + logger.info("Starting download of state median incomes.") + download_state = requests.get(self.STATE_MEDIAN_INCOME_URL, verify=None) + self.state_median_incomes = json.loads(download_state.content) + + def transform(self) -> None: + logger.info("Starting transforms.") + + # Run transforms: + geocorr_df = self._transform_geocorr() + msa_median_incomes_df = self._transform_msa_median_incomes() + state_median_incomes_df = self._transform_state_median_incomes() + + # Join CBGs on MSA incomes + merged_df = geocorr_df.merge( + msa_median_incomes_df, on=self.MSA_ID_FIELD_NAME, how="left" + ) + + # Merge state income with CBGs + merged_df[self.STATE_GEOID_FIELD_NAME] = ( + merged_df[self.GEOID_FIELD_NAME].astype(str).str[0:2] + ) + + merged_with_state_income_df = merged_df.merge( + state_median_incomes_df, + how="left", + on=self.STATE_GEOID_FIELD_NAME, + ) + + if ( + len(merged_with_state_income_df) + > self.EXPECTED_MAX_CENSUS_BLOCK_GROUPS + ): + raise ValueError("Too many CBGs in join.") + + # Choose reference income: MSA if MSA type is Metro, otherwise use State. + merged_with_state_income_df[self.AMI_REFERENCE_FIELD_NAME] = [ + "MSA" if msa_type == "Metro" else "State" + for msa_type in merged_with_state_income_df[ + self.MSA_TYPE_FIELD_NAME + ] + ] + + # Populate reference income: MSA income if reference income is MSA, state income if reference income is state. + merged_with_state_income_df[ + self.AMI_FIELD_NAME + ] = merged_with_state_income_df.apply( + lambda x: x[self.MSA_INCOME_FIELD_NAME] + if x[self.AMI_REFERENCE_FIELD_NAME] == "MSA" + else x[self.STATE_MEDIAN_INCOME_FIELD_NAME], + axis=1, + ) + + self.output_df = merged_with_state_income_df + + def validate(self) -> None: + logger.info("Validating Census ACS Median Income Data") + + pass + + def load(self) -> None: + logger.info("Saving Census ACS Median Income CSV") + + self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True) + self.output_df[self.COLUMNS_TO_KEEP].to_csv( + path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False + ) diff --git a/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py b/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py index 4411261c..2974a547 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py +++ b/data/data-pipeline/data_pipeline/etl/sources/hud_housing/etl.py @@ -272,7 +272,6 @@ class HudHousingETL(ExtractTransformLoad): - self.df[RENTER_OCCUPIED_NOT_COMPUTED_FIELDS].sum(axis=1) ) - self.df["DENOM INCL NOT COMPUTED"] = ( self.df[OWNER_OCCUPIED_POPULATION_FIELD] + self.df[RENTER_OCCUPIED_POPULATION_FIELD]