j40-cejst-2/data/data-pipeline/etl/sources/census_acs/etl.py
Billy Daly 5504528fdf
Issue 308 python linting (#443)
* Adds flake8, pylint, liccheck, flake8 to dependencies for data-pipeline

* Sets up and runs black autoformatting

* Adds flake8 to tox linting

* Fixes flake8 error F541 f string missing placeholders

* Fixes flake8 E501 line too long

* Fixes flake8 F401 imported but not used

* Adds pylint to tox and disables the following pylint errors:
- C0114: module docstrings
- R0201: method could have been a function
- R0903: too few public methods
- C0103: name case styling
- W0511: fix me
- W1203: f-string interpolation in logging

* Adds utils.py to tox.ini linting, runs black on utils.py

* Fixes import related pylint errors: C0411 and C0412

* Fixes or ignores remaining pylint errors (for discussion later)

* Adds safety and liccheck to tox.ini
2021-08-02 12:16:38 -04:00

103 lines
3.4 KiB
Python

import pandas as pd
import censusdata
from etl.base import ExtractTransformLoad
from etl.sources.census.etl_utils import get_state_fips_codes
from utils import get_module_logger
logger = get_module_logger(__name__)
class CensusACSETL(ExtractTransformLoad):
def __init__(self):
self.ACS_YEAR = 2019
self.OUTPUT_PATH = self.DATA_PATH / "dataset" / f"census_acs_{self.ACS_YEAR}"
self.UNEMPLOYED_FIELD_NAME = "Unemployed civilians (percent)"
self.LINGUISTIC_ISOLATION_FIELD_NAME = "Linguistic isolation (percent)"
self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME = "Linguistic isolation (total)"
self.LINGUISTIC_ISOLATION_FIELDS = [
"C16002_001E",
"C16002_004E",
"C16002_007E",
"C16002_010E",
"C16002_013E",
]
self.df: pd.DataFrame
def _fips_from_censusdata_censusgeo(self, censusgeo: censusdata.censusgeo) -> str:
"""Create a FIPS code from the proprietary censusgeo index."""
fips = "".join([value for (key, value) in censusgeo.params()])
return fips
def extract(self) -> None:
dfs = []
for fips in get_state_fips_codes(self.DATA_PATH):
logger.info(f"Downloading data for state/territory with FIPS code {fips}")
dfs.append(
censusdata.download(
src="acs5",
year=self.ACS_YEAR,
geo=censusdata.censusgeo(
[("state", fips), ("county", "*"), ("block group", "*")]
),
var=[
# Emploment fields
"B23025_005E",
"B23025_003E",
]
+ self.LINGUISTIC_ISOLATION_FIELDS,
)
)
self.df = pd.concat(dfs)
self.df[self.GEOID_FIELD_NAME] = self.df.index.to_series().apply(
func=self._fips_from_censusdata_censusgeo
)
def transform(self) -> None:
logger.info("Starting Census ACS Transform")
# Calculate percent unemployment.
# TODO: remove small-sample data that should be `None` instead of a high-variance fraction.
self.df[self.UNEMPLOYED_FIELD_NAME] = self.df.B23025_005E / self.df.B23025_003E
# Calculate linguistic isolation.
individual_limited_english_fields = [
"C16002_004E",
"C16002_007E",
"C16002_010E",
"C16002_013E",
]
self.df[self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME] = self.df[
individual_limited_english_fields
].sum(axis=1, skipna=True)
self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME] = (
self.df[self.LINGUISTIC_ISOLATION_TOTAL_FIELD_NAME].astype(float)
/ self.df["C16002_001E"]
)
self.df[self.LINGUISTIC_ISOLATION_FIELD_NAME].describe()
def load(self) -> None:
logger.info("Saving Census ACS Data")
# mkdir census
self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
columns_to_include = [
self.GEOID_FIELD_NAME,
self.UNEMPLOYED_FIELD_NAME,
self.LINGUISTIC_ISOLATION_FIELD_NAME,
]
self.df[columns_to_include].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)
def validate(self) -> None:
logger.info("Validating Census ACS Data")
pass