j40-cejst-2/infrastructure/functions/detect-changes-for-worker/index.js
Lucas Scharenbroich 38fff9cea8
Fargate Serverless Workers for Census Data Enrichment and Tile Generation (#230)
* add basic infrastructure

* add cloudfront distribution

* WIP checkpoint

* add ecs cluster

* add conditions and route53 dns entry to cloudfront

* WIP checkin

* Added a raw execution mode for demo/testing

* Add pre-defined Task for ogr2ogr

* Tweak Task Definition name

* Mostly working except for logging error

* Add additional logging permissions

* Succesfully executed ogr2ogr in fargate.  S3 permissions needs to be addresses

* Add multipart permissions

* Add a few more actions

* Put IAM Policy on the correct resource

* Deploy lambda and update events

* fix iam permissions 🤦🏻‍♂️

* Add reference to Tippecanoe container

* Clean up to only use named actions

* Refactor resources to include support for tippecanoe

* Make a more interesting GDAL command

* Pull all ECS variables into environment file; successful test of running tippecanoe container

* Support pre/post commands

* Refactor codebase and enable linting

* Implement many-to-many enrichment between USDS CSV files and Census zipped shapefiles

* Change the GDAL image to one with the built-in drivers

* Add some additional fixes to support the enrichment use case

* Clean up old hello-world example

* Expand the README to include ways to execute the lambdas

* Validate scheduled lambda execution and then comment out

Co-authored-by: Tim Zwolak <timothypage@gmail.com>
2021-06-30 09:29:01 -04:00

130 lines
4.2 KiB
JavaScript

// Standard modules
const fs = require('fs');
const path = require('path');
const { DateTime } = require('luxon');
const logger = console;
// AWS APIs
const AWS = require('aws-sdk');
// Local modules
const util = require('./util');
const gdal = require('./gdal');
const s3 = require('./s3');
const ecs = require('./ecs');
async function handler(event) {
// Build the options for the lambda
const options = initialize(event);
// Determine what action to take
switch (event.action) {
// Execute a raw command against the gdal container
case 'gdal':
return await ecs.executeRawCommand(options, event);
// Assume that we're running ogr2ogr
case 'ogr2ogr':
return await ecs.executeRawCommand(options, {
...event,
command: ['ogr2ogr', ...event.command]
});
case 'tippecanoe':
return await ecs.executeRawCommand(options, event);
// Combine USDS data with external data sources
case 'enrichment':
return await enrichDataWithUSDSAttributes(options, event);
default:
logger.warn(`Unknown action ${event.action}. Exiting`);
break;
}
}
async function enrichDataWithUSDSAttributes(options, event) {
const { logger } = options.deps;
const { util, ecs, s3 } = options.deps.local;
// Use the event.age to calculate the custoff for any input files
const cutoff = util.getTimestampCutoff(options);
logger.info(`Cutoff time of ${cutoff}`);
// Scan the source S3 bucket for items that need to be processed
const { sourceBucketName, sourceBucketPrefix } = event;
const sourceS3Records = await s3.fetchUpdatedS3Objects(options, sourceBucketName, sourceBucketPrefix, cutoff);
// If there are no input record, exit early
if (sourceS3Records.length === 0) {
logger.info(`There are no objects in s3://${sourceBucketName}/${sourceBucketPrefix} that have been modified after the cutoff date`);
return;
}
// Scan for the census records
const { censusBucketName, censusBucketPrefix } = event;
const censusS3Records = await s3.fetchS3Objects(options, censusBucketName, censusBucketPrefix);
// If there are no census datasets, exit early
if (censusS3Records.length === 0) {
logger.info(`There are no objects in s3://${censusBucketName}/${censusBucketPrefix}`);
return;
}
// Create a set of substitution variables for each S3 record that will be applied to the
// action template
const censusVariables = censusS3Records.map(r => util.createSubstitutionVariablesFromS3Record(options, r, 'census'));
const sourceVariables = sourceS3Records.map(r => util.createSubstitutionVariablesFromS3Record(options, r, 'source'));
// Kick off an ECS task for each (source, census) pair.
for ( const census of censusVariables ) {
for ( const source of sourceVariables) {
// Merge the variables together
const vars = { ...census, ...source };
// Let the logs know what's happening
logger.info(`Enriching ${vars['census.Key']} with ${vars['source.Key']}...`);
// Apply the substitutions to the pre, post, and command arrays
const pre = util.applyVariableSubstitutionToArray(options, vars, event.pre);
const post = util.applyVariableSubstitutionToArray(options, vars, event.post);
const command = util.applyVariableSubstitutionToArray(options, vars, event.command);
await ecs.executeRawCommand(options, {
...event,
pre,
command: ['ogr2ogr', ...command],
post
});
}
}
}
/**
* Wrap all dependencies in an object in order to inject as appropriate.
*/
function initialize(event) {
logger.debug('event:', JSON.stringify(event, null, 2));
return {
deps: {
DateTime,
fs,
logger,
path,
s3: new AWS.S3(),
ecs: new AWS.ECS(),
local: {
ecs,
gdal,
s3,
util
}
},
env: process.env,
event
};
}
module.exports = {
handler
};