From 3b04356fb30a6539397cc31a35c132a0823b2040 Mon Sep 17 00:00:00 2001 From: Jorge Escobar <83969469+esfoobar-usds@users.noreply.github.com> Date: Wed, 13 Oct 2021 16:00:33 -0400 Subject: [PATCH] Data sources from S3 (#769) * Started 535 * Data sources from S3 * lint * renove breakpoints * PR comments * lint * census data completed * lint * renaming data source --- .github/workflows/combine-tilefy.yml | 62 +++++++++ .github/workflows/generate-census.yml | 59 +++++++++ .github/workflows/generate-score.yml | 12 +- .../data_pipeline/application.py | 33 ++++- .../data-pipeline/data_pipeline/etl/runner.py | 9 +- .../data_pipeline/etl/score/etl_score.py | 1 + .../data_pipeline/etl/score/etl_score_geo.py | 13 +- .../data_pipeline/etl/score/etl_score_post.py | 20 +-- .../etl/sources/census/etl_utils.py | 51 +++++++ data/data-pipeline/data_pipeline/utils.py | 124 ++++++++++++++---- 10 files changed, 317 insertions(+), 67 deletions(-) create mode 100644 .github/workflows/combine-tilefy.yml create mode 100644 .github/workflows/generate-census.yml diff --git a/.github/workflows/combine-tilefy.yml b/.github/workflows/combine-tilefy.yml new file mode 100644 index 00000000..cf9fa54f --- /dev/null +++ b/.github/workflows/combine-tilefy.yml @@ -0,0 +1,62 @@ +name: Combine and Tilefy +on: + workflow_dispatch: + inputs: + confirm-action: + description: This will rebuild the data sources and regenerate the score, are you sure you want to proceed? (Y/n) + default: n + required: true + +jobs: + deploy_data: + runs-on: ubuntu-latest + defaults: + run: + working-directory: data/data-pipeline + strategy: + matrix: + python-version: [3.9] + steps: + - name: Checkout source + uses: actions/checkout@v2 + - name: Print variables to help debug + uses: hmarr/debug-action@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Setup Poetry + uses: Gr1N/setup-poetry@v7 + - name: Print poetry version + run: poetry --version + - name: Install dependencies + run: poetry install + - name: Install GDAL/ogr2ogr + run: | + sudo apt-add-repository ppa:ubuntugis/ubuntugis-unstable + sudo apt-get update + sudo apt-get install gdal-bin libgdal-dev + pip install GDAL==3.2.3 + - name: Run Scripts + run: | + poetry run download_census + poetry run etl_and_score + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.DATA_DEV_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.DATA_DEV_AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + - name: Deploy to Geoplatform AWS + run: | + aws s3 sync ./data_pipeline/data/dataset/ s3://justice40-data/data-pipeline/data/dataset --acl public-read --delete + aws s3 sync ./data_pipeline/data/score/csv/ s3://justice40-data/data-pipeline/data/score/csv --acl public-read --delete + aws s3 sync ./data_pipeline/data/score/downloadable/ s3://justice40-data/data-pipeline/data/score/downloadable --acl public-read --delete + - name: Update PR with Comment about deployment + uses: mshick/add-pr-comment@v1 + with: + message: | + Data Synced! Find it here: s3://justice40-data/data-pipeline/data/ + repo-token: ${{ secrets.GITHUB_TOKEN }} + repo-token-user-login: "github-actions[bot]" # The user.login for temporary GitHub tokens + allow-repeats: false # This is the default diff --git a/.github/workflows/generate-census.yml b/.github/workflows/generate-census.yml new file mode 100644 index 00000000..bc70348a --- /dev/null +++ b/.github/workflows/generate-census.yml @@ -0,0 +1,59 @@ +name: Generate Census +on: + workflow_dispatch: + inputs: + confirm-action: + description: This will rebuild the census data and upload it to S3, are you sure you want to proceed? (Y/n) + default: n + required: true + +jobs: + deploy_data: + runs-on: ubuntu-latest + defaults: + run: + working-directory: data/data-pipeline + strategy: + matrix: + python-version: [3.9] + steps: + - name: Checkout source + uses: actions/checkout@v2 + - name: Print variables to help debug + uses: hmarr/debug-action@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Setup Poetry + uses: Gr1N/setup-poetry@v7 + - name: Print poetry version + run: poetry --version + - name: Install dependencies + run: poetry install + - name: Install GDAL/ogr2ogr + run: | + sudo apt-add-repository ppa:ubuntugis/ubuntugis-unstable + sudo apt-get update + sudo apt-get install gdal-bin libgdal-dev + pip install GDAL==3.2.3 + - name: Run Census Script + run: | + poetry run python3 data_pipeline/application.py census-data-download -zc + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.DATA_DEV_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.DATA_DEV_AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + - name: Upload Census Zip to Geoplatform AWS + run: | + aws s3 sync ./data_pipeline/data/tmp/census.zip s3://justice40-data/data-sources/census.zip --acl public-read --delete + - name: Update PR with Comment about deployment + uses: mshick/add-pr-comment@v1 + with: + message: | + Data Synced! Find it here: s3://justice40-data/data-pipeline/data/ + repo-token: ${{ secrets.GITHUB_TOKEN }} + repo-token-user-login: 'github-actions[bot]' # The user.login for temporary GitHub tokens + allow-repeats: false # This is the default diff --git a/.github/workflows/generate-score.yml b/.github/workflows/generate-score.yml index 14055122..e91c9dc2 100644 --- a/.github/workflows/generate-score.yml +++ b/.github/workflows/generate-score.yml @@ -31,16 +31,9 @@ jobs: run: poetry --version - name: Install dependencies run: poetry install - - name: Install GDAL/ogr2ogr - run: | - sudo apt-add-repository ppa:ubuntugis/ubuntugis-unstable - sudo apt-get update - sudo apt-get install gdal-bin libgdal-dev - pip install GDAL==3.2.3 - name: Run Scripts run: | - poetry run download_census - poetry run etl_and_score + poetry run python3 data_pipeline/application.py score_full_run - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v1 with: @@ -49,14 +42,13 @@ jobs: aws-region: us-east-1 - name: Deploy to Geoplatform AWS run: | - aws s3 sync ./data_pipeline/data/dataset/ s3://justice40-data/data-pipeline/data/dataset --acl public-read --delete aws s3 sync ./data_pipeline/data/score/csv/ s3://justice40-data/data-pipeline/data/score/csv --acl public-read --delete aws s3 sync ./data_pipeline/data/score/downloadable/ s3://justice40-data/data-pipeline/data/score/downloadable --acl public-read --delete - name: Update PR with Comment about deployment uses: mshick/add-pr-comment@v1 with: message: | - Data Synced! Find it here: s3://justice40-data/data-pipeline/data/ + Data Synced! Find it here: s3://justice40-data/data-pipeline/data/score repo-token: ${{ secrets.GITHUB_TOKEN }} repo-token-user-login: 'github-actions[bot]' # The user.login for temporary GitHub tokens allow-repeats: false # This is the default diff --git a/data/data-pipeline/data_pipeline/application.py b/data/data-pipeline/data_pipeline/application.py index ae21c6c2..7e02b2e2 100644 --- a/data/data-pipeline/data_pipeline/application.py +++ b/data/data-pipeline/data_pipeline/application.py @@ -11,6 +11,7 @@ from data_pipeline.etl.runner import ( ) from data_pipeline.etl.sources.census.etl_utils import ( reset_data_directories as census_reset, + zip_census_data, ) from data_pipeline.tile.generate import generate_tiles from data_pipeline.utils import ( @@ -64,18 +65,27 @@ def data_cleanup(): @cli.command( help="Census data download", ) -def census_data_download(): +@click.option( + "-zc", + "--zip-compress", + is_flag=True, + help="Upload to AWS S3 a zipped archive of the census data.", +) +def census_data_download(zip_compress): """CLI command to download all census shape files from the Census FTP and extract the geojson to generate national and by state Census Block Group CSVs""" - data_path = settings.APP_ROOT / "data" - logger.info("Initializing all census data") + + data_path = settings.APP_ROOT / "data" census_reset(data_path) logger.info("Downloading census data") etl_runner("census") + if zip_compress: + zip_census_data() + logger.info("Completed downloading census data") sys.exit() @@ -124,10 +134,21 @@ def score_full_run(): @cli.command(help="Generate Geojson files with scores baked in") -def geo_score(): - """CLI command to generate the score""" +@click.option("-d", "--data-source", default="local", required=False, type=str) +def geo_score(data_source: str): + """CLI command to generate the score - score_geo() + Args: + data_source (str): Source for the census data (optional) + Options: + - local: fetch census and score data from the local data directory + - aws: fetch census and score from AWS S3 J40 data repository + + Returns: + None + """ + + score_geo(data_source=data_source) sys.exit() diff --git a/data/data-pipeline/data_pipeline/etl/runner.py b/data/data-pipeline/data_pipeline/etl/runner.py index f7eb8580..f23362b6 100644 --- a/data/data-pipeline/data_pipeline/etl/runner.py +++ b/data/data-pipeline/data_pipeline/etl/runner.py @@ -104,18 +104,21 @@ def score_post() -> None: score_post.cleanup() -def score_geo() -> None: +def score_geo(data_source: str = "local") -> None: """Generates the geojson files with score data baked in Args: - None + census_data_source (str): Source for the census data (optional) + Options: + - local (default): fetch census data from the local data directory + - aws: fetch census from AWS S3 J40 data repository Returns: None """ # Score Geo - score_geo = GeoScoreETL() + score_geo = GeoScoreETL(data_source=data_source) score_geo.extract() score_geo.transform() score_geo.load() diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score.py b/data/data-pipeline/data_pipeline/etl/score/etl_score.py index 8e4b2c9c..2cf684fc 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score.py @@ -315,6 +315,7 @@ class ScoreETL(ExtractTransformLoad): def extract(self) -> None: logger.info("Loading data sets from disk.") + # EJSCreen csv Load ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv" self.ejscreen_df = pd.read_csv( diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py b/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py index 5ded2956..6a52ed5e 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score_geo.py @@ -1,9 +1,11 @@ import math - import pandas as pd import geopandas as gpd from data_pipeline.etl.base import ExtractTransformLoad +from data_pipeline.etl.sources.census.etl_utils import ( + check_census_data_source, +) from data_pipeline.utils import get_module_logger logger = get_module_logger(__name__) @@ -14,7 +16,7 @@ class GeoScoreETL(ExtractTransformLoad): A class used to generate per state and national GeoJson files with the score baked in """ - def __init__(self): + def __init__(self, data_source: str = None): self.SCORE_GEOJSON_PATH = self.DATA_PATH / "score" / "geojson" self.SCORE_LOW_GEOJSON = self.SCORE_GEOJSON_PATH / "usa-low.json" self.SCORE_HIGH_GEOJSON = self.SCORE_GEOJSON_PATH / "usa-high.json" @@ -22,6 +24,7 @@ class GeoScoreETL(ExtractTransformLoad): self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv" self.TILE_SCORE_CSV = self.SCORE_CSV_PATH / "tiles" / "usa.csv" + self.DATA_SOURCE = data_source self.CENSUS_USA_GEOJSON = ( self.DATA_PATH / "census" / "geojson" / "us.json" ) @@ -37,6 +40,12 @@ class GeoScoreETL(ExtractTransformLoad): self.geojson_score_usa_low: gpd.GeoDataFrame def extract(self) -> None: + # check census data + check_census_data_source( + census_data_path=self.DATA_PATH / "census", + census_data_source=self.DATA_SOURCE, + ) + logger.info("Reading US GeoJSON (~6 minutes)") self.geojson_usa_df = gpd.read_file( self.CENSUS_USA_GEOJSON, diff --git a/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py b/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py index 5b4375f6..fe13c0e9 100644 --- a/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py +++ b/data/data-pipeline/data_pipeline/etl/score/etl_score_post.py @@ -1,22 +1,10 @@ -import json -import zipfile from pathlib import Path - import pandas as pd from data_pipeline.etl.base import ExtractTransformLoad -from data_pipeline.utils import get_module_logger, get_zip_info +from data_pipeline.utils import get_module_logger, zip_files from . import constants -## zlib is not available on all systems -try: - import zlib # noqa # pylint: disable=unused-import - - compression = zipfile.ZIP_DEFLATED -except (ImportError, AttributeError): - compression = zipfile.ZIP_STORED - - logger = get_module_logger(__name__) @@ -268,11 +256,7 @@ class PostScoreETL(ExtractTransformLoad): logger.info("Compressing files") files_to_compress = [csv_path, excel_path, pdf_path] - with zipfile.ZipFile(zip_path, "w") as zf: - for f in files_to_compress: - zf.write(f, arcname=Path(f).name, compress_type=compression) - zip_info = get_zip_info(zip_path) - logger.info(json.dumps(zip_info, indent=4, sort_keys=True, default=str)) + zip_files(zip_path, files_to_compress) def load(self) -> None: self._load_score_csv( diff --git a/data/data-pipeline/data_pipeline/etl/sources/census/etl_utils.py b/data/data-pipeline/data_pipeline/etl/sources/census/etl_utils.py index 84006091..259fffbb 100644 --- a/data/data-pipeline/data_pipeline/etl/sources/census/etl_utils.py +++ b/data/data-pipeline/data_pipeline/etl/sources/census/etl_utils.py @@ -1,5 +1,6 @@ import csv import os +import sys from pathlib import Path import pandas as pd @@ -9,12 +10,14 @@ from data_pipeline.utils import ( remove_all_dirs_from_dir, remove_files_from_dir, unzip_file_from_url, + zip_directory, ) logger = get_module_logger(__name__) def reset_data_directories(data_path: Path) -> None: + """Empties all census folders""" census_data_path = data_path / "census" # csv @@ -31,6 +34,7 @@ def reset_data_directories(data_path: Path) -> None: def get_state_fips_codes(data_path: Path) -> list: + """Returns a list with state data""" fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv" # check if file exists @@ -69,3 +73,50 @@ def get_state_information(data_path: Path) -> pd.DataFrame: df["fips"] = df["fips"].astype(str).apply(lambda x: x.zfill(2)) return df + + +def check_census_data_source( + census_data_path: Path, census_data_source: str +) -> None: + """Checks if census data is present, and exits gracefully if it doesn't exist. It will download it from S3 + if census_data_source is set to "aws" + + Args: + census_data_path (str): Path for Census data + census_data_source (str): Source for the census data + Options: + - local: fetch census data from the local data directory + - aws: fetch census from AWS S3 J40 data repository + + Returns: + None + + """ + CENSUS_DATA_S3_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/census.zip" + DATA_PATH = settings.APP_ROOT / "data" + + # download from s3 if census_data_source is aws + if census_data_source == "aws": + logger.info("Fetching Census data from AWS S3") + unzip_file_from_url( + CENSUS_DATA_S3_URL, + DATA_PATH / "tmp", + DATA_PATH, + ) + else: + # check if census data is found locally + if not os.path.isfile(census_data_path / "geojson" / "us.json"): + logger.info( + "No local census data found. Please use '-cds aws` to fetch from AWS" + ) + sys.exit() + + +def zip_census_data(): + logger.info("Compressing and uploading census files to AWS S3") + + CENSUS_DATA_PATH = settings.APP_ROOT / "data" / "census" + TMP_PATH = settings.APP_ROOT / "data" / "tmp" + + # zip folder + zip_directory(CENSUS_DATA_PATH, TMP_PATH) diff --git a/data/data-pipeline/data_pipeline/utils.py b/data/data-pipeline/data_pipeline/utils.py index c1bce9e7..5edce8fa 100644 --- a/data/data-pipeline/data_pipeline/utils.py +++ b/data/data-pipeline/data_pipeline/utils.py @@ -1,17 +1,27 @@ +from typing import List import datetime +import json import logging import os import sys import shutil import zipfile from pathlib import Path - -import requests import urllib3 +import requests from data_pipeline.config import settings +## zlib is not available on all systems +try: + import zlib # noqa # pylint: disable=unused-import + + compression = zipfile.ZIP_DEFLATED +except (ImportError, AttributeError): + compression = zipfile.ZIP_STORED + + def get_module_logger(module_name: str) -> logging.Logger: """Instantiates a logger object on stdout @@ -219,6 +229,90 @@ def check_first_run() -> bool: return False +def get_zip_info(archive_path: Path) -> list: + """ + Returns information about a provided archive + + Args: + archive_path (pathlib.Path): Path of the archive to be inspected + + Returns: + a list of information about every file in the zipfile + + """ + zf = zipfile.ZipFile(archive_path) + info_list = [] + for info in zf.infolist(): + info_dict = {} + info_dict["Filename"] = info.filename + info_dict["Comment"] = info.comment.decode("utf8") + info_dict["Modified"] = datetime.datetime(*info.date_time).isoformat() + info_dict["System"] = f"{info.create_system} (0 = Windows, 3 = Unix)" + info_dict["ZIP version"] = info.create_version + info_dict["Compressed"] = f"{info.compress_size} bytes" + info_dict["Uncompressed"] = f"{info.file_size} bytes" + info_list.append(info_dict) + return info_list + + +def zip_files(zip_file_path: Path, files_to_compress: List[Path]): + """ + Zips a list of files in a path + + Args: + zip_file_path (pathlib.Path): Path of the zip file where files are compressed + + Returns: + None + """ + with zipfile.ZipFile(zip_file_path, "w") as zf: + for f in files_to_compress: + zf.write(f, arcname=Path(f).name, compress_type=compression) + zip_info = get_zip_info(zip_file_path) + logger.info(json.dumps(zip_info, indent=4, sort_keys=True, default=str)) + + +def zip_directory( + origin_zip_directory: Path, destination_zip_directory: Path +) -> None: + """ + Zips a whole directory + + Args: + path (pathlib.Path): Path of the directory to be archived + Returns: + None + + """ + + def zipdir(origin_directory: Path, ziph: zipfile.ZipFile): + for root, dirs, files in os.walk(origin_directory): + for file in files: + logger.info(f"Compressing file: {file}") + ziph.write( + os.path.join(root, file), + os.path.relpath( + os.path.join(root, file), + os.path.join(origin_directory, ".."), + ), + compress_type=compression, + ) + + logger.info(f"Compressing {Path(origin_zip_directory).name} directory") + zip_file_name = f"{Path(origin_zip_directory).name}.zip" + + # start archiving + zipf = zipfile.ZipFile( + destination_zip_directory / zip_file_name, "w", zipfile.ZIP_DEFLATED + ) + zipdir(f"{origin_zip_directory}/", zipf) + zipf.close() + + logger.info( + f"Completed compression of {Path(origin_zip_directory).name} directory" + ) + + def get_excel_column_name(index: int) -> str: """Map a numeric index to the appropriate column in Excel. E.g., column #95 is "CR". Only works for the first 1000 columns. @@ -1232,29 +1326,3 @@ def get_excel_column_name(index: int) -> str: ] return excel_column_names[index] - - -def get_zip_info(archive_path: Path) -> list: - """ - Returns information about a provided archive - - Args: - archive_path (pathlib.Path): Path of the archive to be inspected - - Returns: - a list of information about every file in the zipfile - - """ - zf = zipfile.ZipFile(archive_path) - info_list = [] - for info in zf.infolist(): - info_dict = {} - info_dict["Filename"] = info.filename - info_dict["Comment"] = info.comment.decode("utf8") - info_dict["Modified"] = datetime.datetime(*info.date_time).isoformat() - info_dict["System"] = f"{info.create_system} (0 = Windows, 3 = Unix)" - info_dict["ZIP version"] = info.create_version - info_dict["Compressed"] = f"{info.compress_size} bytes" - info_dict["Uncompressed"] = f"{info.file_size} bytes" - info_list.append(info_dict) - return info_list