Data sources from S3 (#769)

* Started 535

* Data sources from S3

* lint

* renove breakpoints

* PR comments

* lint

* census data completed

* lint

* renaming data source
This commit is contained in:
Jorge Escobar 2021-10-13 16:00:33 -04:00 committed by GitHub
parent d1273b63c5
commit 3b04356fb3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 317 additions and 67 deletions

62
.github/workflows/combine-tilefy.yml vendored Normal file
View file

@ -0,0 +1,62 @@
name: Combine and Tilefy
on:
workflow_dispatch:
inputs:
confirm-action:
description: This will rebuild the data sources and regenerate the score, are you sure you want to proceed? (Y/n)
default: n
required: true
jobs:
deploy_data:
runs-on: ubuntu-latest
defaults:
run:
working-directory: data/data-pipeline
strategy:
matrix:
python-version: [3.9]
steps:
- name: Checkout source
uses: actions/checkout@v2
- name: Print variables to help debug
uses: hmarr/debug-action@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Setup Poetry
uses: Gr1N/setup-poetry@v7
- name: Print poetry version
run: poetry --version
- name: Install dependencies
run: poetry install
- name: Install GDAL/ogr2ogr
run: |
sudo apt-add-repository ppa:ubuntugis/ubuntugis-unstable
sudo apt-get update
sudo apt-get install gdal-bin libgdal-dev
pip install GDAL==3.2.3
- name: Run Scripts
run: |
poetry run download_census
poetry run etl_and_score
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.DATA_DEV_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.DATA_DEV_AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Deploy to Geoplatform AWS
run: |
aws s3 sync ./data_pipeline/data/dataset/ s3://justice40-data/data-pipeline/data/dataset --acl public-read --delete
aws s3 sync ./data_pipeline/data/score/csv/ s3://justice40-data/data-pipeline/data/score/csv --acl public-read --delete
aws s3 sync ./data_pipeline/data/score/downloadable/ s3://justice40-data/data-pipeline/data/score/downloadable --acl public-read --delete
- name: Update PR with Comment about deployment
uses: mshick/add-pr-comment@v1
with:
message: |
Data Synced! Find it here: s3://justice40-data/data-pipeline/data/
repo-token: ${{ secrets.GITHUB_TOKEN }}
repo-token-user-login: "github-actions[bot]" # The user.login for temporary GitHub tokens
allow-repeats: false # This is the default

59
.github/workflows/generate-census.yml vendored Normal file
View file

@ -0,0 +1,59 @@
name: Generate Census
on:
workflow_dispatch:
inputs:
confirm-action:
description: This will rebuild the census data and upload it to S3, are you sure you want to proceed? (Y/n)
default: n
required: true
jobs:
deploy_data:
runs-on: ubuntu-latest
defaults:
run:
working-directory: data/data-pipeline
strategy:
matrix:
python-version: [3.9]
steps:
- name: Checkout source
uses: actions/checkout@v2
- name: Print variables to help debug
uses: hmarr/debug-action@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Setup Poetry
uses: Gr1N/setup-poetry@v7
- name: Print poetry version
run: poetry --version
- name: Install dependencies
run: poetry install
- name: Install GDAL/ogr2ogr
run: |
sudo apt-add-repository ppa:ubuntugis/ubuntugis-unstable
sudo apt-get update
sudo apt-get install gdal-bin libgdal-dev
pip install GDAL==3.2.3
- name: Run Census Script
run: |
poetry run python3 data_pipeline/application.py census-data-download -zc
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.DATA_DEV_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.DATA_DEV_AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Upload Census Zip to Geoplatform AWS
run: |
aws s3 sync ./data_pipeline/data/tmp/census.zip s3://justice40-data/data-sources/census.zip --acl public-read --delete
- name: Update PR with Comment about deployment
uses: mshick/add-pr-comment@v1
with:
message: |
Data Synced! Find it here: s3://justice40-data/data-pipeline/data/
repo-token: ${{ secrets.GITHUB_TOKEN }}
repo-token-user-login: 'github-actions[bot]' # The user.login for temporary GitHub tokens
allow-repeats: false # This is the default

View file

@ -31,16 +31,9 @@ jobs:
run: poetry --version run: poetry --version
- name: Install dependencies - name: Install dependencies
run: poetry install run: poetry install
- name: Install GDAL/ogr2ogr
run: |
sudo apt-add-repository ppa:ubuntugis/ubuntugis-unstable
sudo apt-get update
sudo apt-get install gdal-bin libgdal-dev
pip install GDAL==3.2.3
- name: Run Scripts - name: Run Scripts
run: | run: |
poetry run download_census poetry run python3 data_pipeline/application.py score_full_run
poetry run etl_and_score
- name: Configure AWS Credentials - name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1 uses: aws-actions/configure-aws-credentials@v1
with: with:
@ -49,14 +42,13 @@ jobs:
aws-region: us-east-1 aws-region: us-east-1
- name: Deploy to Geoplatform AWS - name: Deploy to Geoplatform AWS
run: | run: |
aws s3 sync ./data_pipeline/data/dataset/ s3://justice40-data/data-pipeline/data/dataset --acl public-read --delete
aws s3 sync ./data_pipeline/data/score/csv/ s3://justice40-data/data-pipeline/data/score/csv --acl public-read --delete aws s3 sync ./data_pipeline/data/score/csv/ s3://justice40-data/data-pipeline/data/score/csv --acl public-read --delete
aws s3 sync ./data_pipeline/data/score/downloadable/ s3://justice40-data/data-pipeline/data/score/downloadable --acl public-read --delete aws s3 sync ./data_pipeline/data/score/downloadable/ s3://justice40-data/data-pipeline/data/score/downloadable --acl public-read --delete
- name: Update PR with Comment about deployment - name: Update PR with Comment about deployment
uses: mshick/add-pr-comment@v1 uses: mshick/add-pr-comment@v1
with: with:
message: | message: |
Data Synced! Find it here: s3://justice40-data/data-pipeline/data/ Data Synced! Find it here: s3://justice40-data/data-pipeline/data/score
repo-token: ${{ secrets.GITHUB_TOKEN }} repo-token: ${{ secrets.GITHUB_TOKEN }}
repo-token-user-login: 'github-actions[bot]' # The user.login for temporary GitHub tokens repo-token-user-login: 'github-actions[bot]' # The user.login for temporary GitHub tokens
allow-repeats: false # This is the default allow-repeats: false # This is the default

View file

@ -11,6 +11,7 @@ from data_pipeline.etl.runner import (
) )
from data_pipeline.etl.sources.census.etl_utils import ( from data_pipeline.etl.sources.census.etl_utils import (
reset_data_directories as census_reset, reset_data_directories as census_reset,
zip_census_data,
) )
from data_pipeline.tile.generate import generate_tiles from data_pipeline.tile.generate import generate_tiles
from data_pipeline.utils import ( from data_pipeline.utils import (
@ -64,18 +65,27 @@ def data_cleanup():
@cli.command( @cli.command(
help="Census data download", help="Census data download",
) )
def census_data_download(): @click.option(
"-zc",
"--zip-compress",
is_flag=True,
help="Upload to AWS S3 a zipped archive of the census data.",
)
def census_data_download(zip_compress):
"""CLI command to download all census shape files from the Census FTP and extract the geojson """CLI command to download all census shape files from the Census FTP and extract the geojson
to generate national and by state Census Block Group CSVs""" to generate national and by state Census Block Group CSVs"""
data_path = settings.APP_ROOT / "data"
logger.info("Initializing all census data") logger.info("Initializing all census data")
data_path = settings.APP_ROOT / "data"
census_reset(data_path) census_reset(data_path)
logger.info("Downloading census data") logger.info("Downloading census data")
etl_runner("census") etl_runner("census")
if zip_compress:
zip_census_data()
logger.info("Completed downloading census data") logger.info("Completed downloading census data")
sys.exit() sys.exit()
@ -124,10 +134,21 @@ def score_full_run():
@cli.command(help="Generate Geojson files with scores baked in") @cli.command(help="Generate Geojson files with scores baked in")
def geo_score(): @click.option("-d", "--data-source", default="local", required=False, type=str)
"""CLI command to generate the score""" def geo_score(data_source: str):
"""CLI command to generate the score
score_geo() Args:
data_source (str): Source for the census data (optional)
Options:
- local: fetch census and score data from the local data directory
- aws: fetch census and score from AWS S3 J40 data repository
Returns:
None
"""
score_geo(data_source=data_source)
sys.exit() sys.exit()

View file

@ -104,18 +104,21 @@ def score_post() -> None:
score_post.cleanup() score_post.cleanup()
def score_geo() -> None: def score_geo(data_source: str = "local") -> None:
"""Generates the geojson files with score data baked in """Generates the geojson files with score data baked in
Args: Args:
None census_data_source (str): Source for the census data (optional)
Options:
- local (default): fetch census data from the local data directory
- aws: fetch census from AWS S3 J40 data repository
Returns: Returns:
None None
""" """
# Score Geo # Score Geo
score_geo = GeoScoreETL() score_geo = GeoScoreETL(data_source=data_source)
score_geo.extract() score_geo.extract()
score_geo.transform() score_geo.transform()
score_geo.load() score_geo.load()

View file

@ -315,6 +315,7 @@ class ScoreETL(ExtractTransformLoad):
def extract(self) -> None: def extract(self) -> None:
logger.info("Loading data sets from disk.") logger.info("Loading data sets from disk.")
# EJSCreen csv Load # EJSCreen csv Load
ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv" ejscreen_csv = self.DATA_PATH / "dataset" / "ejscreen_2019" / "usa.csv"
self.ejscreen_df = pd.read_csv( self.ejscreen_df = pd.read_csv(

View file

@ -1,9 +1,11 @@
import math import math
import pandas as pd import pandas as pd
import geopandas as gpd import geopandas as gpd
from data_pipeline.etl.base import ExtractTransformLoad from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.etl.sources.census.etl_utils import (
check_census_data_source,
)
from data_pipeline.utils import get_module_logger from data_pipeline.utils import get_module_logger
logger = get_module_logger(__name__) logger = get_module_logger(__name__)
@ -14,7 +16,7 @@ class GeoScoreETL(ExtractTransformLoad):
A class used to generate per state and national GeoJson files with the score baked in A class used to generate per state and national GeoJson files with the score baked in
""" """
def __init__(self): def __init__(self, data_source: str = None):
self.SCORE_GEOJSON_PATH = self.DATA_PATH / "score" / "geojson" self.SCORE_GEOJSON_PATH = self.DATA_PATH / "score" / "geojson"
self.SCORE_LOW_GEOJSON = self.SCORE_GEOJSON_PATH / "usa-low.json" self.SCORE_LOW_GEOJSON = self.SCORE_GEOJSON_PATH / "usa-low.json"
self.SCORE_HIGH_GEOJSON = self.SCORE_GEOJSON_PATH / "usa-high.json" self.SCORE_HIGH_GEOJSON = self.SCORE_GEOJSON_PATH / "usa-high.json"
@ -22,6 +24,7 @@ class GeoScoreETL(ExtractTransformLoad):
self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv" self.SCORE_CSV_PATH = self.DATA_PATH / "score" / "csv"
self.TILE_SCORE_CSV = self.SCORE_CSV_PATH / "tiles" / "usa.csv" self.TILE_SCORE_CSV = self.SCORE_CSV_PATH / "tiles" / "usa.csv"
self.DATA_SOURCE = data_source
self.CENSUS_USA_GEOJSON = ( self.CENSUS_USA_GEOJSON = (
self.DATA_PATH / "census" / "geojson" / "us.json" self.DATA_PATH / "census" / "geojson" / "us.json"
) )
@ -37,6 +40,12 @@ class GeoScoreETL(ExtractTransformLoad):
self.geojson_score_usa_low: gpd.GeoDataFrame self.geojson_score_usa_low: gpd.GeoDataFrame
def extract(self) -> None: def extract(self) -> None:
# check census data
check_census_data_source(
census_data_path=self.DATA_PATH / "census",
census_data_source=self.DATA_SOURCE,
)
logger.info("Reading US GeoJSON (~6 minutes)") logger.info("Reading US GeoJSON (~6 minutes)")
self.geojson_usa_df = gpd.read_file( self.geojson_usa_df = gpd.read_file(
self.CENSUS_USA_GEOJSON, self.CENSUS_USA_GEOJSON,

View file

@ -1,22 +1,10 @@
import json
import zipfile
from pathlib import Path from pathlib import Path
import pandas as pd import pandas as pd
from data_pipeline.etl.base import ExtractTransformLoad from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger, get_zip_info from data_pipeline.utils import get_module_logger, zip_files
from . import constants from . import constants
## zlib is not available on all systems
try:
import zlib # noqa # pylint: disable=unused-import
compression = zipfile.ZIP_DEFLATED
except (ImportError, AttributeError):
compression = zipfile.ZIP_STORED
logger = get_module_logger(__name__) logger = get_module_logger(__name__)
@ -268,11 +256,7 @@ class PostScoreETL(ExtractTransformLoad):
logger.info("Compressing files") logger.info("Compressing files")
files_to_compress = [csv_path, excel_path, pdf_path] files_to_compress = [csv_path, excel_path, pdf_path]
with zipfile.ZipFile(zip_path, "w") as zf: zip_files(zip_path, files_to_compress)
for f in files_to_compress:
zf.write(f, arcname=Path(f).name, compress_type=compression)
zip_info = get_zip_info(zip_path)
logger.info(json.dumps(zip_info, indent=4, sort_keys=True, default=str))
def load(self) -> None: def load(self) -> None:
self._load_score_csv( self._load_score_csv(

View file

@ -1,5 +1,6 @@
import csv import csv
import os import os
import sys
from pathlib import Path from pathlib import Path
import pandas as pd import pandas as pd
@ -9,12 +10,14 @@ from data_pipeline.utils import (
remove_all_dirs_from_dir, remove_all_dirs_from_dir,
remove_files_from_dir, remove_files_from_dir,
unzip_file_from_url, unzip_file_from_url,
zip_directory,
) )
logger = get_module_logger(__name__) logger = get_module_logger(__name__)
def reset_data_directories(data_path: Path) -> None: def reset_data_directories(data_path: Path) -> None:
"""Empties all census folders"""
census_data_path = data_path / "census" census_data_path = data_path / "census"
# csv # csv
@ -31,6 +34,7 @@ def reset_data_directories(data_path: Path) -> None:
def get_state_fips_codes(data_path: Path) -> list: def get_state_fips_codes(data_path: Path) -> list:
"""Returns a list with state data"""
fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv" fips_csv_path = data_path / "census" / "csv" / "fips_states_2010.csv"
# check if file exists # check if file exists
@ -69,3 +73,50 @@ def get_state_information(data_path: Path) -> pd.DataFrame:
df["fips"] = df["fips"].astype(str).apply(lambda x: x.zfill(2)) df["fips"] = df["fips"].astype(str).apply(lambda x: x.zfill(2))
return df return df
def check_census_data_source(
census_data_path: Path, census_data_source: str
) -> None:
"""Checks if census data is present, and exits gracefully if it doesn't exist. It will download it from S3
if census_data_source is set to "aws"
Args:
census_data_path (str): Path for Census data
census_data_source (str): Source for the census data
Options:
- local: fetch census data from the local data directory
- aws: fetch census from AWS S3 J40 data repository
Returns:
None
"""
CENSUS_DATA_S3_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/census.zip"
DATA_PATH = settings.APP_ROOT / "data"
# download from s3 if census_data_source is aws
if census_data_source == "aws":
logger.info("Fetching Census data from AWS S3")
unzip_file_from_url(
CENSUS_DATA_S3_URL,
DATA_PATH / "tmp",
DATA_PATH,
)
else:
# check if census data is found locally
if not os.path.isfile(census_data_path / "geojson" / "us.json"):
logger.info(
"No local census data found. Please use '-cds aws` to fetch from AWS"
)
sys.exit()
def zip_census_data():
logger.info("Compressing and uploading census files to AWS S3")
CENSUS_DATA_PATH = settings.APP_ROOT / "data" / "census"
TMP_PATH = settings.APP_ROOT / "data" / "tmp"
# zip folder
zip_directory(CENSUS_DATA_PATH, TMP_PATH)

View file

@ -1,17 +1,27 @@
from typing import List
import datetime import datetime
import json
import logging import logging
import os import os
import sys import sys
import shutil import shutil
import zipfile import zipfile
from pathlib import Path from pathlib import Path
import requests
import urllib3 import urllib3
import requests
from data_pipeline.config import settings from data_pipeline.config import settings
## zlib is not available on all systems
try:
import zlib # noqa # pylint: disable=unused-import
compression = zipfile.ZIP_DEFLATED
except (ImportError, AttributeError):
compression = zipfile.ZIP_STORED
def get_module_logger(module_name: str) -> logging.Logger: def get_module_logger(module_name: str) -> logging.Logger:
"""Instantiates a logger object on stdout """Instantiates a logger object on stdout
@ -219,6 +229,90 @@ def check_first_run() -> bool:
return False return False
def get_zip_info(archive_path: Path) -> list:
"""
Returns information about a provided archive
Args:
archive_path (pathlib.Path): Path of the archive to be inspected
Returns:
a list of information about every file in the zipfile
"""
zf = zipfile.ZipFile(archive_path)
info_list = []
for info in zf.infolist():
info_dict = {}
info_dict["Filename"] = info.filename
info_dict["Comment"] = info.comment.decode("utf8")
info_dict["Modified"] = datetime.datetime(*info.date_time).isoformat()
info_dict["System"] = f"{info.create_system} (0 = Windows, 3 = Unix)"
info_dict["ZIP version"] = info.create_version
info_dict["Compressed"] = f"{info.compress_size} bytes"
info_dict["Uncompressed"] = f"{info.file_size} bytes"
info_list.append(info_dict)
return info_list
def zip_files(zip_file_path: Path, files_to_compress: List[Path]):
"""
Zips a list of files in a path
Args:
zip_file_path (pathlib.Path): Path of the zip file where files are compressed
Returns:
None
"""
with zipfile.ZipFile(zip_file_path, "w") as zf:
for f in files_to_compress:
zf.write(f, arcname=Path(f).name, compress_type=compression)
zip_info = get_zip_info(zip_file_path)
logger.info(json.dumps(zip_info, indent=4, sort_keys=True, default=str))
def zip_directory(
origin_zip_directory: Path, destination_zip_directory: Path
) -> None:
"""
Zips a whole directory
Args:
path (pathlib.Path): Path of the directory to be archived
Returns:
None
"""
def zipdir(origin_directory: Path, ziph: zipfile.ZipFile):
for root, dirs, files in os.walk(origin_directory):
for file in files:
logger.info(f"Compressing file: {file}")
ziph.write(
os.path.join(root, file),
os.path.relpath(
os.path.join(root, file),
os.path.join(origin_directory, ".."),
),
compress_type=compression,
)
logger.info(f"Compressing {Path(origin_zip_directory).name} directory")
zip_file_name = f"{Path(origin_zip_directory).name}.zip"
# start archiving
zipf = zipfile.ZipFile(
destination_zip_directory / zip_file_name, "w", zipfile.ZIP_DEFLATED
)
zipdir(f"{origin_zip_directory}/", zipf)
zipf.close()
logger.info(
f"Completed compression of {Path(origin_zip_directory).name} directory"
)
def get_excel_column_name(index: int) -> str: def get_excel_column_name(index: int) -> str:
"""Map a numeric index to the appropriate column in Excel. E.g., column #95 is "CR". """Map a numeric index to the appropriate column in Excel. E.g., column #95 is "CR".
Only works for the first 1000 columns. Only works for the first 1000 columns.
@ -1232,29 +1326,3 @@ def get_excel_column_name(index: int) -> str:
] ]
return excel_column_names[index] return excel_column_names[index]
def get_zip_info(archive_path: Path) -> list:
"""
Returns information about a provided archive
Args:
archive_path (pathlib.Path): Path of the archive to be inspected
Returns:
a list of information about every file in the zipfile
"""
zf = zipfile.ZipFile(archive_path)
info_list = []
for info in zf.infolist():
info_dict = {}
info_dict["Filename"] = info.filename
info_dict["Comment"] = info.comment.decode("utf8")
info_dict["Modified"] = datetime.datetime(*info.date_time).isoformat()
info_dict["System"] = f"{info.create_system} (0 = Windows, 3 = Unix)"
info_dict["ZIP version"] = info.create_version
info_dict["Compressed"] = f"{info.compress_size} bytes"
info_dict["Uncompressed"] = f"{info.file_size} bytes"
info_list.append(info_dict)
return info_list