2021-06-28 16:16:14 -04:00
|
|
|
import csv
|
2021-08-05 15:35:54 -04:00
|
|
|
import os
|
2021-10-13 16:00:33 -04:00
|
|
|
import sys
|
2021-08-02 12:16:38 -04:00
|
|
|
from pathlib import Path
|
2021-06-28 16:16:14 -04:00
|
|
|
|
2021-08-05 15:35:54 -04:00
|
|
|
import pandas as pd
|
|
|
|
from data_pipeline.config import settings
|
|
|
|
from data_pipeline.utils import (
|
|
|
|
get_module_logger,
|
2021-07-20 14:55:39 -04:00
|
|
|
remove_all_dirs_from_dir,
|
2021-08-05 15:35:54 -04:00
|
|
|
remove_files_from_dir,
|
2021-07-20 14:55:39 -04:00
|
|
|
unzip_file_from_url,
|
2021-10-13 16:00:33 -04:00
|
|
|
zip_directory,
|
2021-07-20 14:55:39 -04:00
|
|
|
)
|
|
|
|
|
|
|
|
logger = get_module_logger(__name__)
|
2021-06-28 16:16:14 -04:00
|
|
|
|
|
|
|
|
2021-11-16 10:05:09 -05:00
|
|
|
def reset_data_directories(
|
|
|
|
data_path: Path,
|
|
|
|
) -> None:
|
2021-10-13 16:00:33 -04:00
|
|
|
"""Empties all census folders"""
|
2021-06-28 16:16:14 -04:00
|
|
|
census_data_path = data_path / "census"
|
|
|
|
|
|
|
|
# csv
|
|
|
|
csv_path = census_data_path / "csv"
|
2021-11-16 10:05:09 -05:00
|
|
|
remove_files_from_dir(
|
|
|
|
csv_path, ".csv", exception_list=["fips_states_2010.csv"]
|
|
|
|
)
|
2021-06-28 16:16:14 -04:00
|
|
|
|
|
|
|
# geojson
|
|
|
|
geojson_path = census_data_path / "geojson"
|
|
|
|
remove_files_from_dir(geojson_path, ".json")
|
|
|
|
|
|
|
|
# shp
|
|
|
|
shp_path = census_data_path / "shp"
|
|
|
|
remove_all_dirs_from_dir(shp_path)
|
|
|
|
|
|
|
|
|
|
|
|
def get_state_fips_codes(data_path: Path) -> list:
|
2021-10-13 16:00:33 -04:00
|
|
|
"""Returns a list with state data"""
|
2021-06-28 16:16:14 -04:00
|
|
|
fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv"
|
|
|
|
|
|
|
|
# check if file exists
|
|
|
|
if not os.path.isfile(fips_csv_path):
|
2021-08-02 12:16:38 -04:00
|
|
|
logger.info("Downloading fips from S3 repository")
|
2021-06-28 16:16:14 -04:00
|
|
|
unzip_file_from_url(
|
2021-08-05 12:55:21 -04:00
|
|
|
settings.AWS_JUSTICE40_DATASOURCES_URL + "/fips_states_2010.zip",
|
2021-06-28 16:16:14 -04:00
|
|
|
data_path / "tmp",
|
|
|
|
data_path / "census" / "csv",
|
|
|
|
)
|
|
|
|
|
|
|
|
fips_state_list = []
|
2021-09-14 17:28:59 -04:00
|
|
|
with open(fips_csv_path, encoding="utf-8") as csv_file:
|
2021-06-28 16:16:14 -04:00
|
|
|
csv_reader = csv.reader(csv_file, delimiter=",")
|
|
|
|
line_count = 0
|
|
|
|
|
|
|
|
for row in csv_reader:
|
|
|
|
if line_count == 0:
|
|
|
|
line_count += 1
|
|
|
|
else:
|
|
|
|
fips = row[0].strip()
|
|
|
|
fips_state_list.append(fips)
|
|
|
|
return fips_state_list
|
2021-07-26 08:02:25 -07:00
|
|
|
|
|
|
|
|
|
|
|
def get_state_information(data_path: Path) -> pd.DataFrame:
|
|
|
|
"""Load the full state file as a dataframe.
|
|
|
|
|
|
|
|
Useful because of the state regional information.
|
|
|
|
"""
|
|
|
|
fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv"
|
|
|
|
|
|
|
|
df = pd.read_csv(fips_csv_path)
|
|
|
|
|
|
|
|
# Left pad the FIPS codes with 0s
|
|
|
|
df["fips"] = df["fips"].astype(str).apply(lambda x: x.zfill(2))
|
|
|
|
|
|
|
|
return df
|
2021-10-13 16:00:33 -04:00
|
|
|
|
|
|
|
|
|
|
|
def check_census_data_source(
|
|
|
|
census_data_path: Path, census_data_source: str
|
|
|
|
) -> None:
|
|
|
|
"""Checks if census data is present, and exits gracefully if it doesn't exist. It will download it from S3
|
|
|
|
if census_data_source is set to "aws"
|
|
|
|
|
|
|
|
Args:
|
|
|
|
census_data_path (str): Path for Census data
|
|
|
|
census_data_source (str): Source for the census data
|
|
|
|
Options:
|
|
|
|
- local: fetch census data from the local data directory
|
|
|
|
- aws: fetch census from AWS S3 J40 data repository
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
None
|
|
|
|
|
|
|
|
"""
|
|
|
|
CENSUS_DATA_S3_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/census.zip"
|
|
|
|
DATA_PATH = settings.APP_ROOT / "data"
|
|
|
|
|
|
|
|
# download from s3 if census_data_source is aws
|
|
|
|
if census_data_source == "aws":
|
|
|
|
logger.info("Fetching Census data from AWS S3")
|
|
|
|
unzip_file_from_url(
|
|
|
|
CENSUS_DATA_S3_URL,
|
|
|
|
DATA_PATH / "tmp",
|
|
|
|
DATA_PATH,
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
# check if census data is found locally
|
|
|
|
if not os.path.isfile(census_data_path / "geojson" / "us.json"):
|
|
|
|
logger.info(
|
2021-11-05 16:26:14 -04:00
|
|
|
"No local census data found. Please use '-s aws` to fetch from AWS"
|
2021-10-13 16:00:33 -04:00
|
|
|
)
|
|
|
|
sys.exit()
|
|
|
|
|
|
|
|
|
|
|
|
def zip_census_data():
|
2021-11-01 18:05:05 -04:00
|
|
|
logger.info("Compressing census files to data/tmp folder")
|
2021-10-13 16:00:33 -04:00
|
|
|
|
|
|
|
CENSUS_DATA_PATH = settings.APP_ROOT / "data" / "census"
|
|
|
|
TMP_PATH = settings.APP_ROOT / "data" / "tmp"
|
|
|
|
|
|
|
|
# zip folder
|
|
|
|
zip_directory(CENSUS_DATA_PATH, TMP_PATH)
|