mirror of
https://github.com/DOI-DO/j40-cejst-2.git
synced 2025-08-02 21:04:18 -07:00
Fargate Serverless Workers for Census Data Enrichment and Tile Generation (#230)
* add basic infrastructure
* add cloudfront distribution
* WIP checkpoint
* add ecs cluster
* add conditions and route53 dns entry to cloudfront
* WIP checkin
* Added a raw execution mode for demo/testing
* Add pre-defined Task for ogr2ogr
* Tweak Task Definition name
* Mostly working except for logging error
* Add additional logging permissions
* Succesfully executed ogr2ogr in fargate. S3 permissions needs to be addresses
* Add multipart permissions
* Add a few more actions
* Put IAM Policy on the correct resource
* Deploy lambda and update events
* fix iam permissions 🤦🏻♂️
* Add reference to Tippecanoe container
* Clean up to only use named actions
* Refactor resources to include support for tippecanoe
* Make a more interesting GDAL command
* Pull all ECS variables into environment file; successful test of running tippecanoe container
* Support pre/post commands
* Refactor codebase and enable linting
* Implement many-to-many enrichment between USDS CSV files and Census zipped shapefiles
* Change the GDAL image to one with the built-in drivers
* Add some additional fixes to support the enrichment use case
* Clean up old hello-world example
* Expand the README to include ways to execute the lambdas
* Validate scheduled lambda execution and then comment out
Co-authored-by: Tim Zwolak <timothypage@gmail.com>
This commit is contained in:
parent
92efc5c937
commit
38fff9cea8
27 changed files with 7271 additions and 0 deletions
72
infrastructure/functions/detect-changes-for-worker/s3.js
Normal file
72
infrastructure/functions/detect-changes-for-worker/s3.js
Normal file
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Helper function to determine if we should interpret an S3 object as
|
||||
* a "simple" file or a folder.
|
||||
*/
|
||||
function isSimpleObject(c, prefix) {
|
||||
// If the object ends with a separator charater, interpret that as a folder
|
||||
if (c.Key.endsWith('/')) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the object is more deeply nested than the prefix, then ignore, e.g.
|
||||
// prefix = /foo/bar = two separators
|
||||
// c.Key = /foo/bar/baz = three separators [skip]
|
||||
// c.Key = /foo/bar.txt = two separators [pass]
|
||||
// This doesn't give the *exact* count, but all we really care about is that
|
||||
// the value is the same for the prefix and the S3 Key.
|
||||
const separatorCount = c.Key.split('/').length;
|
||||
const prefixSeparatorCount = prefix.split('/').length;
|
||||
|
||||
return separatorCount === prefixSeparatorCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all of the simple S3 objects from a prefix that have a LastModified
|
||||
* date after a cutoff date.
|
||||
*
|
||||
* This returns objects that have recently changed for re-processing.
|
||||
*/
|
||||
function fetchUpdatedS3Objects (options, bucket, prefix, cutoff) {
|
||||
// Define a filter function that only looks at object on a single level of the
|
||||
// bucket and removes any objects with a LastModified timestamp prior to the cutoff
|
||||
const threshold = cutoff.toMillis();
|
||||
const filterFunc = (c) => isSimpleObject(c, prefix) && (threshold < c.LastModified.getTime());
|
||||
|
||||
return fetchS3Objects(options, bucket, prefix, filterFunc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic utility function to return S3 object from a bucket that match a given prefix. An
|
||||
* optional filtering function can be passed in.
|
||||
*/
|
||||
async function fetchS3Objects (options, bucket, prefix, filterFunc = () => true) {
|
||||
const { s3 } = options.deps;
|
||||
const objects = [];
|
||||
|
||||
// Limit the results to items in this bucket with a specific prefix
|
||||
const params = {
|
||||
Bucket: bucket,
|
||||
Prefix: prefix
|
||||
};
|
||||
|
||||
do {
|
||||
// Get all of the initial objects
|
||||
const response = await s3.listObjectsV2(params).promise();
|
||||
|
||||
// Optionally, filter out objects
|
||||
const contents = response.Contents.filter(filterFunc);
|
||||
objects.push(...contents);
|
||||
|
||||
params.ContinuationToken = response.IsTruncated
|
||||
? response.NextContinuationToken
|
||||
: null;
|
||||
} while (params.ContinuationToken);
|
||||
|
||||
return objects;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
isSimpleObject,
|
||||
fetchS3Objects,
|
||||
fetchUpdatedS3Objects
|
||||
};
|
Loading…
Add table
Add a link
Reference in a new issue