DynamoDB Streams Processor

The DynamoDB Streams Processor is a simple tool that makes working with DynamoDB Streams super easy. It uses the DynamoDB Converter utility to unmarshall the items, plus does some other cool magic (like unwrapping Set s and performing diffs) so you don't need to.

It's not really magic... 🧙

Take a look at the code (index.js) and you'll see that it's not an overly complex script. However, I found myself writing the same thing over and over again for different projects, so I decided to create something that made my life easier. If you find it useful for your projects, that's cool too.

Installation

Via npm or yarn:

npm i dynamodb-streams-processor

yarn add dynamodb-streams-processor

Usage

The dynamodb-streams-processor takes the Records array from a DynamoDB Streams event and returns an array with the Keys , NewImage and OldImage objects unmarshalled to native JavaScript types. By default, it will also unwrap Set s and return their values as an array.

const processor = require ( 'dynamodb-streams-processor' ) exports.handler = async event => { let records = processor(event.Records) }

This will convert something like this:

[ { "eventID" : "fe98111ed30435a4e546c0ecdc2f68f7" , "eventName" : "MODIFY" , "eventVersion" : "1.1" , "eventSource" : "aws:dynamodb" , "awsRegion" : "us-east-1" , "dynamodb" : { "ApproximateCreationDateTime" : 1576516897 , "Keys" : { "sk" : { "S" : "sortKey" }, "pk" : { "S" : "partitionKey" } }, "NewImage" : { "sk" : { "S" : "sortKey" }, "pk" : { "S" : "partitionKey" }, "Boolean" : { "BOOL" : false }, "List" : { "L" : [ { "S" : "Cats" }, { "S" : "Dogs" }, { "N" : "123" } ] }, "Map" : { "M" : { "Name" : { "S" : "Jane" }, "Age" : { "N" : "30" } } }, "IntegerNumber" : { "N" : "123" }, "String" : { "S" : "String Test" }, "StringSet" : { "SS" : [ "Test1" , "Test2" ] } }, "SequenceNumber" : "125319600000000012510796858" , "SizeBytes" : 884 , "StreamViewType" : "NEW_IMAGE" }, "eventSourceARN" : "arn:aws:dynamodb:us-east-1:1234567890:table/my-table/stream/2019-12-16T00:00:00.000" } ]

Into this:

[ { "eventID" : "fe98111ed30435a4e546c0ecdc2f68f7" , "eventName" : "MODIFY" , "eventVersion" : "1.1" , "eventSource" : "aws:dynamodb" , "awsRegion" : "us-east-1" , "dynamodb" : { "ApproximateCreationDateTime" : 1576516897 , "Keys" : { "sk" : "sortKey" , "pk" : "partitionKey" }, "NewImage" : { "sk" : "sortKey" , "pk" : "partitionKey" , "Boolean" : false "List" : [ "Cats" , "Dogs" , 123 ], "Map" : { "Name" : "Jane" , "Age" : 30 }, "IntegerNumber" : 123 , "String" : "String Test" , "StringSet" : [ "Test1" , "Test2" ] }, "SequenceNumber" : "125319600000000012510796858" , "SizeBytes" : 884 , "StreamViewType" : "NEW_IMAGE" }, "eventSourceARN" : "arn:aws:dynamodb:us-east-1:1234567890:table/my-table/stream/2019-12-16T00:00:00.000" } ]

Calculating diffs

If you are using the NEW_AND_OLD_IMAGES stream view type, then it's often useful to compare the OldImage and the NewImage . This library bakes in the deep-object-diff library to let you perform a number of supported diff operations.

Pass in true as the second parameter for a standard diff, or pass a string value of added , deleted , updated , or detailed for more specific diff operations. The diff will be returned as an object using the property name Diff under the dynamodb property.

Example truncated for clarity:

[ { ... "dynamodb" : { "ApproximateCreationDateTime" : 1576516897 , "Keys" : { "sk" : "sortKey" , "pk" : "partitionKey" }, "NewImage" : { "sk" : "sortKey" , "pk" : "partitionKey" , ... }, "OldImage" : { "sk" : "sortKey" , "pk" : "partitionKey" , ... }, "Diff" : { "add" : "added" , "Boolean" : false , "List" : { "1" : "Fish" }, "Map" : { "Age" : 35 } }, ... "StreamViewType" : "NEW_AND_OLD_IMAGES" }, "eventSourceARN" : "arn:aws:dynamodb:us-east-1:1234567890:table/my-table/stream/2019-12-16T00:00:00.000" } ]

Passing additional options

The DynamoDB Converter supports additional options when unmarshalling data. The only one that might makes sense to use, is the boolean wrapNumbers option. This will return numbers as a NumberValue object instead of converting them to native JavaScript numbers. This allows for the safe round-trip transport of numbers of arbitrary size.

To pass this option, send it an object as the third parameter:

exports.handler = async event => { let records = processor(event.Records, false , { wrapNumbers : true }) }

This library automatically unwraps Set s (which the unmarshaller does not do). If you would like to return wrapped Set s, pass { wrapSets: true } as the third parameter.

Contributions and Feedback

Contributions, ideas and bug reports are welcome and greatly appreciated. Please add issues for suggestions and bug reports or create a pull request. You can also contact me on Twitter: @jeremy_daly.