diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py b/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py index 2e49aa2a..b8509438 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py @@ -27,6 +27,8 @@ class PostScoreETL(ExtractTransformLoad): datasets. """ + STATE_CODE_COLUMN = "State Code" + def __init__(self, data_source: str = None): self.DATA_SOURCE = data_source self.input_counties_df: pd.DataFrame @@ -54,7 +56,9 @@ class PostScoreETL(ExtractTransformLoad): def _extract_states(self, state_path: Path) -> pd.DataFrame: logger.info("Reading States CSV") return pd.read_csv( - state_path, dtype={"fips": "string", "state_abbreviation": "string"} + state_path, + dtype={"fips": "string", "state_abbreviation": "string"}, + usecols=["fips", "state_name", "state_abbreviation"], ) def _extract_score(self, score_path: Path) -> pd.DataFrame: @@ -133,12 +137,11 @@ class PostScoreETL(ExtractTransformLoad): # remove unnecessary columns new_df = initial_states_df.rename( columns={ - "fips": "State Code", + "fips": self.STATE_CODE_COLUMN, "state_name": field_names.STATE_FIELD, "state_abbreviation": "State Abbreviation", } ) - new_df.drop(["region", "division"], axis=1, inplace=True) return new_df def _transform_score(self, initial_score_df: pd.DataFrame) -> pd.DataFrame: @@ -159,16 +162,31 @@ class PostScoreETL(ExtractTransformLoad): states_df: pd.DataFrame, score_df: pd.DataFrame, ) -> pd.DataFrame: - # merge state with counties - logger.info("Merging state with county info") - county_state_merged = counties_df.merge( - states_df, on="State Abbreviation", how="left" + + logger.info("Merging county info with score info") + score_county_merged = score_df.merge( + # We drop state abbreviation so we don't get it twice + counties_df[["GEOID", "County Name"]], + on="GEOID", # GEOID is the county ID + how="left", ) - # merge state + county with score - score_county_state_merged = score_df.merge( - county_state_merged, - on="GEOID", # GEOID is the county ID + logger.info("Merging state info with county-score info") + # Here, we need to join on a separate key, since there's no + # entry for the island areas in the counties df (there are no + # counties!) Thus, unless we join state separately from county, + # when we join on GEOID, we lose information about the islands + score_county_merged[self.STATE_CODE_COLUMN] = score_county_merged[ + self.GEOID_TRACT_FIELD_NAME + ].str[:2] + # TODO: For future reference, we could also refactor this code so that + # the FIPS / State or Territory / County info gets created as an ETL + # process and joined in etl_score, rather than added in post like this. + # That would be a bit more consistent and automatically parallelized + score_county_state_merged = score_county_merged.merge( + states_df, + left_on=self.STATE_CODE_COLUMN, + right_on=self.STATE_CODE_COLUMN, how="left", ) @@ -184,7 +202,7 @@ class PostScoreETL(ExtractTransformLoad): # recast population to integer score_county_state_merged["Total population"] = ( - merged_df["Total population"].fillna(0.0).astype(int) + merged_df["Total population"].fillna(0).astype(int) ) de_duplicated_df = merged_df.dropna( diff --git a/data/data-pipeline/data_pipeline/etl/score/tests/snapshots/score_data_expected.pkl b/data/data-pipeline/data_pipeline/etl/score/tests/snapshots/score_data_expected.pkl index a61b5aef..e0372dd9 100644 Binary files a/data/data-pipeline/data_pipeline/etl/score/tests/snapshots/score_data_expected.pkl and b/data/data-pipeline/data_pipeline/etl/score/tests/snapshots/score_data_expected.pkl differ