j40-cejst-2/infrastructure/functions/detect-changes-for-worker/index.js

131 lines
4.2 KiB
JavaScript
Raw Permalink Normal View History

// Standard modules
const fs = require('fs');
const path = require('path');
const { DateTime } = require('luxon');
const logger = console;
// AWS APIs
const AWS = require('aws-sdk');
// Local modules
const util = require('./util');
const gdal = require('./gdal');
const s3 = require('./s3');
const ecs = require('./ecs');
async function handler(event) {
// Build the options for the lambda
const options = initialize(event);
// Determine what action to take
switch (event.action) {
// Execute a raw command against the gdal container
case 'gdal':
return await ecs.executeRawCommand(options, event);
// Assume that we're running ogr2ogr
case 'ogr2ogr':
return await ecs.executeRawCommand(options, {
...event,
command: ['ogr2ogr', ...event.command]
});
case 'tippecanoe':
return await ecs.executeRawCommand(options, event);
// Combine USDS data with external data sources
case 'enrichment':
return await enrichDataWithUSDSAttributes(options, event);
default:
logger.warn(`Unknown action ${event.action}. Exiting`);
break;
}
}
async function enrichDataWithUSDSAttributes(options, event) {
const { logger } = options.deps;
const { util, ecs, s3 } = options.deps.local;
// Use the event.age to calculate the custoff for any input files
const cutoff = util.getTimestampCutoff(options);
logger.info(`Cutoff time of ${cutoff}`);
// Scan the source S3 bucket for items that need to be processed
const { sourceBucketName, sourceBucketPrefix } = event;
const sourceS3Records = await s3.fetchUpdatedS3Objects(options, sourceBucketName, sourceBucketPrefix, cutoff);
// If there are no input record, exit early
if (sourceS3Records.length === 0) {
logger.info(`There are no objects in s3://${sourceBucketName}/${sourceBucketPrefix} that have been modified after the cutoff date`);
return;
}
// Scan for the census records
const { censusBucketName, censusBucketPrefix } = event;
const censusS3Records = await s3.fetchS3Objects(options, censusBucketName, censusBucketPrefix);
// If there are no census datasets, exit early
if (censusS3Records.length === 0) {
logger.info(`There are no objects in s3://${censusBucketName}/${censusBucketPrefix}`);
return;
}
// Create a set of substitution variables for each S3 record that will be applied to the
// action template
const censusVariables = censusS3Records.map(r => util.createSubstitutionVariablesFromS3Record(options, r, 'census'));
const sourceVariables = sourceS3Records.map(r => util.createSubstitutionVariablesFromS3Record(options, r, 'source'));
// Kick off an ECS task for each (source, census) pair.
for ( const census of censusVariables ) {
for ( const source of sourceVariables) {
// Merge the variables together
const vars = { ...census, ...source };
// Let the logs know what's happening
logger.info(`Enriching ${vars['census.Key']} with ${vars['source.Key']}...`);
// Apply the substitutions to the pre, post, and command arrays
const pre = util.applyVariableSubstitutionToArray(options, vars, event.pre);
const post = util.applyVariableSubstitutionToArray(options, vars, event.post);
const command = util.applyVariableSubstitutionToArray(options, vars, event.command);
await ecs.executeRawCommand(options, {
...event,
pre,
command: ['ogr2ogr', ...command],
post
});
}
}
}
/**
* Wrap all dependencies in an object in order to inject as appropriate.
*/
function initialize(event) {
logger.debug('event:', JSON.stringify(event, null, 2));
return {
deps: {
DateTime,
fs,
logger,
path,
s3: new AWS.S3(),
ecs: new AWS.ECS(),
local: {
ecs,
gdal,
s3,
util
}
},
env: process.env,
event
};
}
module.exports = {
handler
};