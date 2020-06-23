openbase logo
openbase logo
CategoriesLeaderboard
kin

kinesis

by Michael Hart
1.2.2 (see all)

A Node.js stream implementation of Amazon's Kinesis

npm
GitHub
CDN

Overview

DocumentationTutorialsReviewsMaintenanceDependenciesVersionsAlternatives
Showing:

Popularity

Downloads/wk

1.6K

GitHub Stars

148

Maintenance

Last Commit

2yrs ago

Contributors

2

Package

Dependencies

5

License

MIT

Type Definitions

DefinitelyTyped

Tree-Shakeable

No?

Categories

Reviews

Be the first to rate

Readme

kinesis

Build Status

A Node.js stream implementation of Amazon's Kinesis.

Allows the consumer to pump data directly into (and out of) a Kinesis stream.

This makes it trivial to setup Kinesis as a logging sink with Bunyan, or any other logging library.

For setting up a local Kinesis instance (eg for testing), check out Kinesalite.

NB: API has changed from 0.x to 1.x

Example

var fs = require('fs'),
    Transform = require('stream').Transform,
    kinesis = require('kinesis'),
    KinesisStream = kinesis.KinesisStream

// Uses credentials from process.env by default

kinesis.listStreams({region: 'us-west-1'}, function(err, streams) {
  if (err) throw err

  console.log(streams)
  // ["http-logs", "click-logs"]
})


var kinesisSink = kinesis.stream('http-logs')
// OR new KinesisStream('http-logs')

fs.createReadStream('http.log').pipe(kinesisSink)


var kinesisSource = kinesis.stream({name: 'click-logs', oldest: true})

// Data is retrieved as Record objects, so let's transform into Buffers
var bufferify = new Transform({objectMode: true})
bufferify._transform = function(record, encoding, cb) {
  cb(null, record.Data)
}

kinesisSource.pipe(bufferify).pipe(fs.createWriteStream('click.log'))


// Create a new Kinesis stream using the raw API
kinesis.request('CreateStream', {StreamName: 'test', ShardCount: 2}, function(err) {
  if (err) throw err

  kinesis.request('DescribeStream', {StreamName: 'test'}, function(err, data) {
    if (err) throw err

    console.dir(data)
  })
})

API

kinesis.stream(options)

new KinesisStream(options)

Returns a readable and writable Node.js stream for the given Kinesis stream

options include:

  • region: a string, or (deprecated) object with AWS credentials, host, port, etc (resolved from env or file by default)
  • credentials: an object with accessKeyId/secretAccessKey properties (resolved from env, file or IAM by default)
  • shards: an array of shard IDs, or shard objects. If not provided, these will be fetched and cached.
  • oldest: if truthy, then will start at the oldest records (using TRIM_HORIZON) instead of the latest
  • writeConcurrency: how many parallel writes to allow (1 by default)
  • cacheSize: number of PartitionKey-to-SequenceNumber mappings to cache (1000 by default)
  • agent: HTTP agent used (uses Node.js defaults otherwise)
  • timeout: HTTP request timeout (uses Node.js defaults otherwise)
  • initialRetryMs: first pause before retrying under the default policy (50 by default)
  • maxRetries: max number of retries under the default policy (10 by default)
  • errorCodes: array of Node.js error codes to retry on (['EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'EMFILE'] by default)
  • errorNames: array of Kinesis exceptions to retry on (['ProvisionedThroughputExceededException', 'ThrottlingException'] by default)
  • retryPolicy: a function to implement a retry policy different from the default one

kinesis.listStreams([options], callback)

Calls the callback with an array of all stream names for the AWS account

kinesis.request(action, [data], [options], callback)

Makes a generic Kinesis request with the given action (eg, ListStreams) and data as the body.

options include:

  • region: a string, or (deprecated) object with AWS credentials, host, port, etc (resolved from env or file by default)
  • credentials: an object with accessKeyId/secretAccessKey properties (resolved from env, file or IAM by default)
  • agent: HTTP agent used (uses Node.js defaults otherwise)
  • timeout: HTTP request timeout (uses Node.js defaults otherwise)
  • initialRetryMs: first pause before retrying under the default policy (50 by default)
  • maxRetries: max number of retries under the default policy (10 by default)
  • errorCodes: array of Node.js error codes to retry on (['EADDRINFO', 'ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ENOTFOUND', 'EMFILE'] by default)
  • errorNames: array of Kinesis exceptions to retry on (['ProvisionedThroughputExceededException', 'ThrottlingException'] by default)
  • retryPolicy: a function to implement a retry policy different from the default one

Rate & Review

Great Documentation0
Easy to Use0
Performant0
Highly Customizable0
Bleeding Edge0
Responsive Maintainers0
Poor Documentation0
Hard to Use0
Slow0
Buggy0
Abandoned0
Unwelcoming Community0
100
No reviews found
Be the first to rate

Alternatives

No alternatives found

Tutorials

No tutorials found
Add a tutorial