kinesis

A Node.js stream implementation of Amazon's Kinesis.

Allows the consumer to pump data directly into (and out of) a Kinesis stream.

This makes it trivial to setup Kinesis as a logging sink with Bunyan, or any other logging library.

For setting up a local Kinesis instance (eg for testing), check out Kinesalite.

NB: API has changed from 0.x to 1.x

Example

var fs = require ( 'fs' ), Transform = require ( 'stream' ).Transform, kinesis = require ( 'kinesis' ), KinesisStream = kinesis.KinesisStream kinesis.listStreams({ region : 'us-west-1' }, function ( err, streams ) { if (err) throw err console .log(streams) }) var kinesisSink = kinesis.stream( 'http-logs' ) fs.createReadStream( 'http.log' ).pipe(kinesisSink) var kinesisSource = kinesis.stream({ name : 'click-logs' , oldest : true }) var bufferify = new Transform({ objectMode : true }) bufferify._transform = function ( record, encoding, cb ) { cb( null , record.Data) } kinesisSource.pipe(bufferify).pipe(fs.createWriteStream( 'click.log' )) kinesis.request( 'CreateStream' , { StreamName : 'test' , ShardCount : 2 }, function ( err ) { if (err) throw err kinesis.request( 'DescribeStream' , { StreamName : 'test' }, function ( err, data ) { if (err) throw err console .dir(data) }) })

API

new KinesisStream(options)

Returns a readable and writable Node.js stream for the given Kinesis stream

options include:

region : a string, or (deprecated) object with AWS credentials, host, port, etc (resolved from env or file by default)

: a string, or (deprecated) object with AWS credentials, host, port, etc (resolved from env or file by default) credentials : an object with accessKeyId / secretAccessKey properties (resolved from env, file or IAM by default)

: an object with / properties (resolved from env, file or IAM by default) shards : an array of shard IDs, or shard objects. If not provided, these will be fetched and cached.

: an array of shard IDs, or shard objects. If not provided, these will be fetched and cached. oldest : if truthy, then will start at the oldest records (using TRIM_HORIZON ) instead of the latest

: if truthy, then will start at the oldest records (using ) instead of the latest writeConcurrency : how many parallel writes to allow ( 1 by default)

: how many parallel writes to allow ( by default) cacheSize : number of PartitionKey-to-SequenceNumber mappings to cache ( 1000 by default)

: number of PartitionKey-to-SequenceNumber mappings to cache ( by default) agent : HTTP agent used (uses Node.js defaults otherwise)

: HTTP agent used (uses Node.js defaults otherwise) timeout : HTTP request timeout (uses Node.js defaults otherwise)

: HTTP request timeout (uses Node.js defaults otherwise) initialRetryMs : first pause before retrying under the default policy ( 50 by default)

: first pause before retrying under the default policy ( by default) maxRetries : max number of retries under the default policy ( 10 by default)

: max number of retries under the default policy ( by default) errorCodes : array of Node.js error codes to retry on ( ['EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'EMFILE'] by default)

: array of Node.js error codes to retry on ( by default) errorNames : array of Kinesis exceptions to retry on ( ['ProvisionedThroughputExceededException', 'ThrottlingException'] by default)

: array of Kinesis exceptions to retry on ( by default) retryPolicy : a function to implement a retry policy different from the default one

Calls the callback with an array of all stream names for the AWS account

Makes a generic Kinesis request with the given action (eg, ListStreams ) and data as the body.

options include: