Backend change for Zipfile pt. 2 (#469)

* Fixes #303 : adding downloadable zip archive logic
* linter recommendations
* Pushes data directory to AWS. We'll want to move to use AWS for this ASAP, but this works for now
* updating pattern
This commit is contained in:
Nat Hillard 2021-08-09 10:39:59 -04:00 committed by GitHub
commit 9a9d5fdf7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 223 additions and 10 deletions

View file

@ -1,7 +1,19 @@
import pandas as pd
import json
import zipfile
from pathlib import Path
import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger
from data_pipeline.utils import get_module_logger, get_zip_info
## zlib is not available on all systems
try:
import zlib # noqa # pylint: disable=unused-import
compression = zipfile.ZIP_DEFLATED
except (ImportError, AttributeError):
compression = zipfile.ZIP_STORED
logger = get_module_logger(__name__)
@ -18,11 +30,14 @@ class PostScoreETL(ExtractTransformLoad):
self.CENSUS_COUNTIES_COLS = ["USPS", "GEOID", "NAME"]
self.CENSUS_USA_CSV = self.DATA_PATH / "census" / "csv" / "us.csv"
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
self.DOWNLOADABLE_INFO_PATH = self.DATA_PATH / "score" / "downloadable"
self.STATE_CSV = self.DATA_PATH / "census" / "csv" / "fips_states_2010.csv"
self.FULL_SCORE_CSV = self.SCORE_CSV_PATH / "full" / "usa.csv"
self.TILR_SCORE_CSV = self.SCORE_CSV_PATH / "tile" / "usa.csv"
self.FULL_SCORE_CSV_PLUS_COUNTIES = (
self.SCORE_CSV_PATH / "full" / "usa_counties.csv"
)
self.TILES_SCORE_COLUMNS = [
"GEOID10",
@ -35,6 +50,46 @@ class PostScoreETL(ExtractTransformLoad):
self.TILES_SCORE_CSV_PATH = self.SCORE_CSV_PATH / "tiles"
self.TILES_SCORE_CSV = self.TILES_SCORE_CSV_PATH / "usa.csv"
# These are the
self.DOWNLOADABLE_SCORE_INDICATORS_BASIC = [
"Percent individuals age 25 or over with less than high school degree",
"Linguistic isolation (percent)",
"Poverty (Less than 200% of federal poverty line)",
"Unemployed civilians (percent)",
"Housing burden (percent)",
"Respiratory hazard index",
"Diesel particulate matter",
"Particulate matter (PM2.5)",
"Traffic proximity and volume",
"Proximity to RMP sites",
"Wastewater discharge",
"Percent pre-1960s housing (lead paint indicator)",
"Total population",
]
# For every indicator above, we want to include percentile and min-max normalized variants also
self.DOWNLOADABLE_SCORE_INDICATORS_FULL = list(
pd.core.common.flatten(
[
[p, f"{p} (percentile)", f"{p} (min-max normalized)"]
for p in self.DOWNLOADABLE_SCORE_INDICATORS_BASIC
]
)
)
# Finally we augment with the GEOID10, county, and state
self.DOWNLOADABLE_SCORE_COLUMNS = [
"GEOID10",
"County Name",
"State Name",
*self.DOWNLOADABLE_SCORE_INDICATORS_FULL,
]
self.DOWNLOADABLE_SCORE_CSV = self.DOWNLOADABLE_INFO_PATH / "usa.csv"
self.DOWNLOADABLE_SCORE_EXCEL = self.DOWNLOADABLE_INFO_PATH / "usa.xlsx"
self.DOWNLOADABLE_SCORE_ZIP = (
self.DOWNLOADABLE_INFO_PATH / "Screening Tool Data.zip"
)
self.counties_df: pd.DataFrame
self.states_df: pd.DataFrame
self.score_df: pd.DataFrame
@ -43,7 +98,8 @@ class PostScoreETL(ExtractTransformLoad):
def extract(self) -> None:
super().extract(
self.CENSUS_COUNTIES_ZIP_URL, self.TMP_PATH,
self.CENSUS_COUNTIES_ZIP_URL,
self.TMP_PATH,
)
logger.info("Reading Counties CSV")
@ -67,7 +123,8 @@ class PostScoreETL(ExtractTransformLoad):
# rename some of the columns to prepare for merge
self.counties_df = self.counties_df[["USPS", "GEOID", "NAME"]]
self.counties_df.rename(
columns={"USPS": "State Abbreviation", "NAME": "County Name"}, inplace=True,
columns={"USPS": "State Abbreviation", "NAME": "County Name"},
inplace=True,
)
# remove unnecessary columns
@ -122,14 +179,45 @@ class PostScoreETL(ExtractTransformLoad):
# set the score to the new df
self.score_county_state_merged = removed_df
def load(self) -> None:
def _save_full_csv(self):
logger.info("Saving Full Score CSV with County Information")
self.SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
self.score_county_state_merged.to_csv(self.FULL_SCORE_CSV, index=False)
self.score_county_state_merged.to_csv(
self.FULL_SCORE_CSV_PLUS_COUNTIES, index=False
)
def _save_tile_csv(self):
logger.info("Saving Tile Score CSV")
# TODO: check which are the columns we'll use
# Related to: https://github.com/usds/justice40-tool/issues/302
score_tiles = self.score_county_state_merged[self.TILES_SCORE_COLUMNS]
self.TILES_SCORE_CSV_PATH.mkdir(parents=True, exist_ok=True)
score_tiles.to_csv(self.TILES_SCORE_CSV, index=False)
def _save_downloadable_zip(self):
logger.info("Saving Downloadable CSV")
logger.info(list(self.score_county_state_merged.columns))
logger.info(self.DOWNLOADABLE_SCORE_COLUMNS)
downloadable_tiles = self.score_county_state_merged[
self.DOWNLOADABLE_SCORE_COLUMNS
]
self.DOWNLOADABLE_INFO_PATH.mkdir(parents=True, exist_ok=True)
logger.info("Writing downloadable csv")
downloadable_tiles.to_csv(self.DOWNLOADABLE_SCORE_CSV, index=False)
logger.info("Writing downloadable excel")
downloadable_tiles.to_excel(self.DOWNLOADABLE_SCORE_EXCEL, index=False)
logger.info("Compressing files")
files_to_compress = [self.DOWNLOADABLE_SCORE_CSV, self.DOWNLOADABLE_SCORE_EXCEL]
with zipfile.ZipFile(self.DOWNLOADABLE_SCORE_ZIP, "w") as zf:
for f in files_to_compress:
zf.write(f, arcname=Path(f).name, compress_type=compression)
zip_info = get_zip_info(self.DOWNLOADABLE_SCORE_ZIP)
logger.info(json.dumps(zip_info, indent=4, sort_keys=True, default=str))
def load(self) -> None:
self._save_full_csv()
self._save_tile_csv()
self._save_downloadable_zip()

View file

@ -1,5 +1,7 @@
import datetime
import logging
import os
import sys
import shutil
import zipfile
from pathlib import Path
@ -119,8 +121,13 @@ def unzip_file_from_url(
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger.info(f"Downloading {file_url}")
download = requests.get(file_url, verify=verify)
file_contents = download.content
response = requests.get(file_url, verify=verify)
if response.status_code == 200:
file_contents = response.content
else:
sys.exit(
f"HTTP response {response.status_code} from url {file_url}. Info: {response.content}"
)
zip_file_path = download_path / "downloaded.zip"
zip_file = open(zip_file_path, "wb")
@ -152,6 +159,7 @@ def score_folder_cleanup() -> None:
logger.info("Initializing all score data")
remove_all_from_dir(data_path / "score" / "csv")
remove_all_from_dir(data_path / "score" / "geojson")
remove_all_from_dir(data_path / "score" / "downloadable")
def temp_folder_cleanup() -> None:
@ -1176,3 +1184,29 @@ def get_excel_column_name(index: int) -> str:
]
return excel_column_names[index]
def get_zip_info(archive_path: Path) -> list:
"""
Returns information about a provided archive
Args:
archive_path (pathlib.Path): Path of the archive to be inspected
Returns:
a list of information about every file in the zipfile
"""
zf = zipfile.ZipFile(archive_path)
info_list = []
for info in zf.infolist():
info_dict = {}
info_dict["Filename"] = info.filename
info_dict["Comment"] = info.comment.decode("utf8")
info_dict["Modified"] = datetime.datetime(*info.date_time).isoformat()
info_dict["System"] = f"{info.create_system} (0 = Windows, 3 = Unix)"
info_dict["ZIP version"] = info.create_version
info_dict["Compressed"] = f"{info.compress_size} bytes"
info_dict["Uncompressed"] = f"{info.file_size} bytes"
info_list.append(info_dict)
return info_list