AWS Sync Public Read (#508)

* adding layer to mvts

* small fix for GHA

* AWS Sync Public Read

* removed temp file

* updated state media income ftp
This commit is contained in:
Jorge Escobar 2021-08-12 14:17:25 -04:00 committed by GitHub
commit 773c035493
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 70 deletions

View file

@ -4,6 +4,7 @@ import censusdata
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.sources.census.etl_utils import get_state_fips_codes
from data_pipeline.utils import get_module_logger
from data_pipeline.config import settings
logger = get_module_logger(__name__)
@ -11,10 +12,14 @@ logger = get_module_logger(__name__)
class CensusACSETL(ExtractTransformLoad):
def __init__(self):
self.ACS_YEAR = 2019
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / f"census_acs_{self.ACS_YEAR}"
self.OUTPUT_PATH = (
self.DATA_PATH / "dataset" / f"census_acs_{self.ACS_YEAR}"
)
self.UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME = "Linguistic isolation (total)"
self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME = (
"Linguistic isolation (total)"
)
self.LINGUISTIC_ISOLATION_FIELDS = [
"C16002_001E",
"C16002_004E",
@ -23,7 +28,9 @@ class CensusACSETL(ExtractTransformLoad):
"C16002_013E",
]
self.MEDIAN_INCOME_FIELD = "B19013_001E"
self.MEDIAN_INCOME_FIELD_NAME = "Median household income in the past 12 months"
self.MEDIAN_INCOME_FIELD_NAME = (
"Median household income in the past 12 months"
)
self.MEDIAN_INCOME_STATE_FIELD_NAME = "Median household income (State)"
self.MEDIAN_INCOME_AS_PERCENT_OF_STATE_FIELD_NAME = (
"Median household income (% of state median household income)"
@ -32,22 +39,32 @@ class CensusACSETL(ExtractTransformLoad):
self.df: pd.DataFrame
self.state_median_income_df: pd.DataFrame
# TODO: refactor this to put this file on s3 and download it from there
self.STATE_MEDIAN_INCOME_FTP_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/2014_to_2019_state_median_income.zip"
)
self.STATE_MEDIAN_INCOME_FILE_PATH = (
self.DATA_PATH
/ "needs_to_be_moved_to_s3"
/ "2014_to_2019_state_median_income.csv"
self.TMP_PATH / "2014_to_2019_state_median_income.csv"
)
def _fips_from_censusdata_censusgeo(self, censusgeo: censusdata.censusgeo) -> str:
def _fips_from_censusdata_censusgeo(
self, censusgeo: censusdata.censusgeo
) -> str:
"""Create a FIPS code from the proprietary censusgeo index."""
fips = "".join([value for (key, value) in censusgeo.params()])
return fips
def extract(self) -> None:
# Extract state median income
super().extract(
self.STATE_MEDIAN_INCOME_FTP_URL,
self.TMP_PATH,
)
dfs = []
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(f"Downloading data for state/territory with FIPS code {fips}")
logger.info(
f"Downloading data for state/territory with FIPS code {fips}"
)
dfs.append(
censusdata.download(
@ -82,7 +99,9 @@ class CensusACSETL(ExtractTransformLoad):
logger.info("Starting Census ACS Transform")
# Rename median income
self.df[self.MEDIAN_INCOME_FIELD_NAME] = self.df[self.MEDIAN_INCOME_FIELD]
self.df[self.MEDIAN_INCOME_FIELD_NAME] = self.df[
self.MEDIAN_INCOME_FIELD
]
# TODO: handle null values for CBG median income, which are `-666666666`.
@ -104,7 +123,9 @@ class CensusACSETL(ExtractTransformLoad):
# Calculate percent unemployment.
# TODO: remove small-sample data that should be `None` instead of a high-variance fraction.
self.df[self.UNEMPLOYED_FIELD_NAME] = self.df.B23025_005E / self.df.B23025_003E
self.df[self.UNEMPLOYED_FIELD_NAME] = (
self.df.B23025_005E / self.df.B23025_003E
)
# Calculate linguistic isolation.
individual_limited_english_fields = [