diff --git a/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl_imputations.py b/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl_imputations.py index 692ac243..00d3fe2a 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl_imputations.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census_acs/etl_imputations.py @@ -10,11 +10,11 @@ from data_pipeline.utils import get_module_logger logger = get_module_logger(__name__) -def _prepare_dataframe_for_imputation( +def _get_impute_tract_list( columns_to_impute: list, geo_df: gpd.GeoDataFrame, geoid_field: str = "GEOID10_TRACT", -): +) -> list: # generate a list of tracts for which at least one of the imputation # columns is null @@ -26,30 +26,35 @@ def _prepare_dataframe_for_imputation( logger.info(f"Imputing values for {len(tract_list)} unique tracts.") assert len(tract_list) > 0, "Error: No missing values to impute" - return tract_list, geo_df + return tract_list -def _get_state_and_county_fills(df, tract_list, impute_var_pair_list): +def _get_state_and_county_fills( + df: pd.DataFrame, tract_list: list, impute_var_pair_list: list +) -> pd.DataFrame: + counties = df[GEOID_TRACT_FIELD].str[:5] # county fips is 5 digits + states = df[GEOID_TRACT_FIELD].str[:2] # state fips is 2 digits # When there is no neighbor average, we take the county-level average or state-level averages for impute_var_pair in impute_var_pair_list: - # Fill missings with county means - df[impute_var_pair.imputed_field_name] = np.where( - (df[impute_var_pair.imputed_field_name].isna()) - & (df[GEOID_TRACT_FIELD].isin(tract_list)), - df.groupby(df[GEOID_TRACT_FIELD].str[:5])[ - impute_var_pair.raw_field_name - ].transform(np.mean), - df[impute_var_pair.imputed_field_name], + # Get a column of county means or state means when county means are not available + county_means = df.groupby(counties)[ + impute_var_pair.raw_field_name + ].transform(np.mean) + state_means = df.groupby(states)[ + impute_var_pair.raw_field_name + ].transform(np.mean) + fill_means = county_means.fillna(state_means) + + # Identify where these must be imputed + impute_tracts = (df[impute_var_pair.imputed_field_name].isna()) & ( + df[GEOID_TRACT_FIELD].isin(tract_list) ) - # Fill the remaining missings with state means + + # And then impute while preserving null character elsewhere df[impute_var_pair.imputed_field_name] = np.where( - (df[impute_var_pair.imputed_field_name].isna()) - & (df[GEOID_TRACT_FIELD].isin(tract_list)), - df.groupby(df[GEOID_TRACT_FIELD].str[:2])[ - impute_var_pair.raw_field_name - ].transform(np.mean), - df[impute_var_pair.imputed_field_name], + impute_tracts, fill_means, df[impute_var_pair.imputed_field_name] ) + return df @@ -82,7 +87,7 @@ def calculate_income_measures( rename_dict[impute_var.raw_field_name] = impute_var.imputed_field_name # Determine where to impute variables and fill a column with nulls - tract_list, geo_df = _prepare_dataframe_for_imputation( + tract_list = _get_impute_tract_list( columns_to_impute=raw_fields, geo_df=geo_df, geoid_field=geoid_field,