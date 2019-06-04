no-kafka is Apache Kafka 0.9 client for Node.js with new unified consumer API support.

Supports sync and async Gzip and Snappy compression, producer batching and controllable retries, offers few predefined group assignment strategies and producer partitioner option.

All methods will return a promise

Please check a CHANGELOG for backward incompatible changes in version 3.x

Using

download and install Kafka

create your test topic:

kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic kafka-test-topic --partitions 3 --replication-factor 1

install no-kafka

npm install no-kafka

Producer

Example:

var Kafka = require ( 'no-kafka' ); var producer = new Kafka.Producer(); return producer.init().then( function ( ) { return producer.send({ topic : 'kafka-test-topic' , partition : 0 , message : { value : 'Hello!' } }); }) .then( function ( result ) { });

Send and retry if failed within 100ms delay:

return producer.send(messages, { retries : { attempts : 2 , delay : { min : 100 , max : 300 } } });

Batching (grouping) produce requests

Accumulate messages into single batch until their total size is >= 1024 bytes or 100ms timeout expires (overwrite Producer constructor options):

producer.send(messages, { batch : { size : 1024 , maxWait : 100 } }); producer.send(messages, { batch : { size : 1024 , maxWait : 100 } });

Please note, that if you pass different options to the send() method then these messages will be grouped into separate batches:

producer.send(messages, { batch : { size : 1024 , maxWait : 100 }, codec : Kafka.COMPRESSION_GZIP }); producer.send(messages, { batch : { size : 1024 , maxWait : 100 }, codec : Kafka.COMPRESSION_SNAPPY });

Keyed Messages

Send a message with the key:

producer.send({ topic : 'kafka-test-topic' , partition : 0 , message : { key : 'some-key' value : 'Hello!' } });

Custom Partitioner

Example: override the default partitioner with a custom partitioner that only uses a portion of the key.

var util = require ( 'util' ); var Kafka = require ( 'no-kafka' ); var Producer = Kafka.Producer; var DefaultPartitioner = Kafka.DefaultPartitioner; function MyPartitioner ( ) { DefaultPartitioner.apply( this , arguments ); } util.inherits(MyPartitioner, DefaultPartitioner); MyPartitioner.prototype.getKey = function getKey ( message ) { return message.key.split( '-' )[ 0 ]; }; var producer = new Producer({ partitioner : new MyPartitioner() }); return producer.init().then( function ( ) { return producer.send({ topic : 'kafka-test-topic' , message : { key : 'namespace-key' , value : 'Hello!' } }); });

Producer options:

requiredAcks - require acknoledgments for produce request. If it is 0 the server will not send any response. If it is 1 (default), the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

- require acknoledgments for produce request. If it is 0 the server will not send any response. If it is 1 (default), the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas). timeout - timeout in ms for produce request

- timeout in ms for produce request clientId - ID of this client, defaults to 'no-kafka-client'

- ID of this client, defaults to 'no-kafka-client' connectionString - comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'

- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092' reconnectionDelay - controls optionally progressive delay between reconnection attempts in case of network error: min - minimum delay, used as increment value for next attempts, defaults to 1000ms max - maximum delay value, defaults to 1000ms

- controls optionally progressive delay between reconnection attempts in case of network error: partitioner - Class instance used to determine topic partition for message. If message already specifies a partition, the partitioner won't be used. The partitioner must inherit from Kafka.DefaultPartitioner . The partition method receives 3 arguments: the topic name, an array with topic partitions, and the message (useful to partition by key, etc.). partition can be sync or async (return a Promise).

- Class instance used to determine topic partition for message. If message already specifies a partition, the partitioner won't be used. The partitioner must inherit from . The method receives 3 arguments: the topic name, an array with topic partitions, and the message (useful to partition by key, etc.). can be sync or async (return a Promise). retries - controls number of attempts at delay between them when produce request fails attempts - number of total attempts to send the message, defaults to 3 delay - controls delay between retries, the delay is progressive and incrememented with each attempt with min value steps up to but not exceeding max value min - minimum delay, used as increment value for next attempts, defaults to 1000ms max - maximum delay value, defaults to 3000ms

- controls number of attempts at delay between them when produce request fails codec - compression codec, one of Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_SNAPPY, Kafka.COMPRESSION_GZIP

- compression codec, one of Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_SNAPPY, Kafka.COMPRESSION_GZIP batch - control batching (grouping) of requests size - group messages together into single batch until their total size exceeds this value, defaults to 16384 bytes. Set to 0 to disable batching. maxWait - send grouped messages after this amount of milliseconds expire even if their total size doesn't exceed batch.size yet, defaults to 10ms. Set to 0 to disable batching.

- control batching (grouping) of requests asyncCompression - boolean, use asynchronouse compression instead of synchronous, defaults to false

- boolean, use asynchronouse compression instead of synchronous, defaults to connectionTimeout - timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms

- timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms socketTimeout - timeout for Kafka connection socket in milliseconds, defaults to 0 (disabled)

SimpleConsumer

Manually specify topic, partition and offset when subscribing. Suitable for simple use cases.

Example:

var consumer = new Kafka.SimpleConsumer(); var dataHandler = function ( messageSet, topic, partition ) { messageSet.forEach( function ( m ) { console .log(topic, partition, m.offset, m.message.value.toString( 'utf8' )); }); }; return consumer.init().then( function ( ) { return consumer.subscribe( 'kafka-test-topic' , [ 0 , 1 ], dataHandler); });

Subscribe (or change subscription) to specific offset and limit maximum received MessageSet size:

consumer.subscribe( 'kafka-test-topic' , 0 , { offset : 20 , maxBytes : 30 }, dataHandler)

Subscribe to latest or earliest offsets in the topic/parition:

consumer.subscribe( 'kafka-test-topic' , 0 , { time : Kafka.LATEST_OFFSET}, dataHandler) consumer.subscribe( 'kafka-test-topic' , 0 , { time : Kafka.EARLIEST_OFFSET}, dataHandler)

Subscribe to all partitions in a topic:

consumer.subscribe( 'kafka-test-topic' , dataHandler)

Commit offset(s) (V0, Kafka saves these commits to Zookeeper)

consumer.commitOffset([ { topic : 'kafka-test-topic' , partition : 0 , offset : 1 }, { topic : 'kafka-test-topic' , partition : 1 , offset : 2 } ])

Fetch commited offset(s)

consumer.fetchOffset([ { topic : 'kafka-test-topic' , partition : 0 }, { topic : 'kafka-test-topic' , partition : 1 } ]).then( function ( result ) { });

SimpleConsumer options

groupId - group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0'

- group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0' maxWaitTime - maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the fetch request is issued, defaults to 100ms

- maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the fetch request is issued, defaults to 100ms idleTimeout - timeout between fetch calls, defaults to 1000ms

- timeout between fetch calls, defaults to 1000ms minBytes - minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 byte

- minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 byte maxBytes - maximum size of messages in a fetch response, defaults to 1MB

- maximum size of messages in a fetch response, defaults to 1MB clientId - ID of this client, defaults to 'no-kafka-client'

- ID of this client, defaults to 'no-kafka-client' connectionString - comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'

- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092' reconnectionDelay - controls optionally progressive delay between reconnection attempts in case of network error: min - minimum delay, used as increment value for next attempts, defaults to 1000ms max - maximum delay value, defaults to 1000ms

- controls optionally progressive delay between reconnection attempts in case of network error: recoveryOffset - recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSET

- recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSET asyncCompression - boolean, use asynchronouse decompression instead of synchronous, defaults to false

- boolean, use asynchronouse decompression instead of synchronous, defaults to handlerConcurrency - specify concurrency level for the consumer handler function, defaults to 10

- specify concurrency level for the consumer handler function, defaults to 10 connectionTimeout - timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms

- timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms socketTimeout - timeout for Kafka connection socket in milliseconds, defaults to 0 (disabled)

GroupConsumer (new unified consumer API)

Specify an assignment strategy (or use no-kafka built-in consistent or round robin assignment strategy) and subscribe by specifying only topics. Elected group leader will automatically assign partitions between all group members.

Example:

var Promise = require ( 'bluebird' ); var consumer = new Kafka.GroupConsumer(); var dataHandler = function ( messageSet, topic, partition ) { return Promise .each(messageSet, function ( m ) { console .log(topic, partition, m.offset, m.message.value.toString( 'utf8' )); return consumer.commitOffset({ topic : topic, partition : partition, offset : m.offset, metadata : 'optional' }); }); }; var strategies = [{ subscriptions : [ 'kafka-test-topic' ], handler : dataHandler }]; consumer.init(strategies);

Assignment strategies

no-kafka provides three built-in strategies:

Kafka.WeightedRoundRobinAssignmentStrategy weighted round robin assignment (based on wrr-pool).

weighted round robin assignment (based on wrr-pool). Kafka.ConsistentAssignmentStrategy which is based on a consistent hash ring and so provides consistent assignment across consumers in a group based on supplied metadata.id and metadata.weight options.

which is based on a consistent hash ring and so provides consistent assignment across consumers in a group based on supplied and options. Kafka.DefaultAssignmentStrategy simple round robin assignment strategy (default).

Using Kafka.WeightedRoundRobinAssignmentStrategy :

var strategies = { subscriptions : [ 'kafka-test-topic' ], metadata : { weight : 4 }, strategy : new Kafka.WeightedRoundRobinAssignmentStrategy(), handler : dataHandler };

Using Kafka.ConsistentAssignmentStrategy :

var strategies = { subscriptions : [ 'kafka-test-topic' ], metadata : { id : process.argv[ 2 ] || 'consumer_1' , weight : 50 }, strategy : new Kafka.ConsistentAssignmentStrategy(), handler : dataHandler };

Note that each consumer in a group should have its own and consistent metadata.id.

You can also write your own assignment strategy by inheriting from Kafka.DefaultAssignmentStrategy and overwriting assignment method.

GroupConsumer options

groupId - group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0.9'

- group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0.9' maxWaitTime - maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the fetch request is issued, defaults to 100ms

- maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the fetch request is issued, defaults to 100ms idleTimeout - timeout between fetch calls, defaults to 1000ms

- timeout between fetch calls, defaults to 1000ms minBytes - minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 byte

- minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 byte maxBytes - maximum size of messages in a fetch response

- maximum size of messages in a fetch response clientId - ID of this client, defaults to 'no-kafka-client'

- ID of this client, defaults to 'no-kafka-client' connectionString - comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'

- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092' reconnectionDelay - controls optionally progressive delay between reconnection attempts in case of network error: min - minimum delay, used as increment value for next attempts, defaults to 1000ms max - maximum delay value, defaults to 1000ms

- controls optionally progressive delay between reconnection attempts in case of network error: sessionTimeout - session timeout in ms, min 6000, max 30000, defaults to 15000

- session timeout in ms, min 6000, max 30000, defaults to heartbeatTimeout - delay between heartbeat requests in ms, defaults to 1000

- delay between heartbeat requests in ms, defaults to retentionTime - offset retention time in ms, defaults to 1 day (24 3600 1000)

- offset retention time in ms, defaults to 1 day (24 3600 1000) startingOffset - starting position (time) when there is no commited offset, defaults to Kafka.LATEST_OFFSET

- starting position (time) when there is no commited offset, defaults to recoveryOffset - recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSET

- recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSET asyncCompression - boolean, use asynchronouse decompression instead of synchronous, defaults to false

- boolean, use asynchronouse decompression instead of synchronous, defaults to handlerConcurrency - specify concurrency level for the consumer handler function, defaults to 10

- specify concurrency level for the consumer handler function, defaults to 10 connectionTimeout - timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms

- timeout for establishing connection to Kafka in milliseconds, defaults to 3000ms socketTimeout - timeout for Kafka connection socket in milliseconds, defaults to 0 (disabled)

GroupAdmin (consumer groups API)

Offers methods:

listGroups - list existing consumer groups

- list existing consumer groups describeGroup - describe existing group by its id

- describe existing group by its id fetchConsumerLag - fetches consumer lag for topics/partitions

listGroups, describeGroup:

var admin = new Kafka.GroupAdmin(); return admin.init().then( function ( ) { return admin.listGroups().then( function ( groups ) { return admin.describeGroup( 'no-kafka-admin-test-group' ).then( function ( group ) { }) }); });

fetchConsumerLag:

var admin = new Kafka.GroupAdmin(); return admin.init().then( function ( ) { return admin.fetchConsumerLag( 'no-kafka-admin-test-group' , [{ topicName : 'kafka-test-topic' , partitions : [ 0 , 1 , 2 ] }]).then( function ( consumerLag ) { }); });

Note that group consumer has to commit offsets first, in order for consumerLag to be available. Otherwise the offset will be set to -1.

Compression

no-kafka supports both SNAPPY and Gzip compression. To use SNAPPY you must install the snappy NPM module in your project.

Enable compression in Producer:

var Kafka = require ( 'no-kafka' ); var producer = new Kafka.Producer({ clientId : 'producer' , codec : Kafka.COMPRESSION_SNAPPY });

Alternatively just send some messages with specified compression codec (overwrites codec set in contructor):

return producer.send({ topic : 'kafka-test-topic' , partition : 0 , message : { value : 'p00' } }, { codec : Kafka.COMPRESSION_SNAPPY })

By default no-kafka will use asynchronous compression and decompression. Disable async compression/decompression (and use sync) with asyncCompression option (synchronous Gzip is not availble in node < 0.11):

Producer:

var producer = new Kafka.Producer({ clientId : 'producer' , asyncCompression : false , codec : Kafka.COMPRESSION_SNAPPY });

Consumer:

var consumer = new Kafka.SimpleConsumer({ idleTimeout : 100 , clientId : 'simple-consumer' , asyncCompression : true });

Connection

Initial Brokers

no-kafka will connect to the hosts specified in connectionString constructor option unless it is omitted. In this case it will use KAFKA_URL environment variable or fallback to default kafka://127.0.0.1:9092 . For better availability always specify several initial brokers: 10.0.1.1:9092,10.0.1.2:9092,10.0.1.3:9092 . The / prefix is optional.

Disconnect / Timeout Handling

All network errors are handled by the library: producer will retry sending failed messages for configured amount of times, simple consumer and group consumer will try to reconnect to failed host, update metadata as needed as so on.

SSL

To connect to Kafka with SSL endpoint enabled specify SSL certificate and key options to load cert/key from files or provide certificate/key directly as strings:

Loading certificate and key from file:

var producer = new Kafka.Producer({ connectionString : 'kafka://127.0.0.1:9093' , ssl : { cert : '/path/to/client.crt' , key : '/path/to/client.key' } });

Specifying certificate and key directly as strings:

var producer = new Kafka.Producer({ connectionString : 'kafka://127.0.0.1:9093' , ssl : { cert : '-----BEGIN CERTIFICATE-----

MIIChTCCAe4C...............' , key : '-----BEGIN RSA PRIVATE KEY-----

MIIEowIBA.......' } });

Other Node.js SSL options are available such as rejectUnauthorized , secureProtocol , ciphers , etc. See Node.js tls.createServer method documentation for more details.

It is also possible to use KAFKA_CLIENT_CERT and KAFKA_CLIENT_CERT_KEY environment variables to specify SSL certificate and key:

KAFKA_URL=kafka://127.0.0.1:9093 KAFKA_CLIENT_CERT=./ test /ssl/client.crt KAFKA_CLIENT_CERT_KEY=./ test /ssl/client.key node producer.js

Or as text strings:

KAFKA_URL=kafka://127.0.0.1:9093 KAFKA_CLIENT_CERT=`cat ./ test /ssl/client.crt` KAFKA_CLIENT_CERT_KEY=`cat ./ test /ssl/client.key` node producer.js

Using a self signed certificate:

Kafka.Producer({ connectionString : 'kafka://127.0.0.1:9093' , ssl : { ca : '/path/to/my-cert.crt' } });

It is also possible to use KAFKA_CLIENT_CA environment variable to specify a self signed SSL certificate:

KAFKA_URL=kafka://127.0.0.1:9093 KAFKA_CLIENT_CA=./ test /ssl/my-cert.crt node producer.js

Remapping Broker Addresses

Sometimes the advertised listener addresses for a Kafka cluster may be incorrect from the client, such as when a Kafka farm is behind NAT or other network infrastructure. In this scenario it is possible to pass a brokerRedirection option to the Producer , SimpleConsumer or GroupConsumer .

The value of the brokerDirection can be either:

A function returning a tuple of host (string) and port (integer), such as: brokerRedirection: function ( host, port ) { return { host : host + '.somesuffix.com' , port : port + 100 , } }

A simple map of connection strings to new connection strings, such as: brokerRedirection : { 'some-host:9092' : 'actual-host:9092' , 'kafka://another-host:9092' : 'another-host:9093' , 'third-host:9092' : 'kafka://third-host:9000' }

A common scenario for this kind of remapping is when a Kafka cluster exists within a Docker application, and the internally advertised names needed for container to container communication do not correspond to the actual external ports or addresses when connecting externally via other tools.

Reconnection delay

In case of network error which prevents further operations no-kafka will try to reconnect to Kafka brokers in a endless loop with the optionally progressive delay which can be configured with reconnectionDelay option.

Logging

You can differentiate messages from several instances of producer/consumer by providing unique clientId in options:

var consumer1 = new Kafka.GroupConsumer({ clientId : 'group-consumer-1' }); var consumer2 = new Kafka.GroupConsumer({ clientId : 'group-consumer-2' });

=>

2016 -01-12T07 :41 :57.884Z INFO group-consumer-1 .... 2016 -01-12T07 :41 :57.884Z INFO group-consumer-2 ....

Change the logging level:

var consumer = new Kafka.GroupConsumer({ clientId : 'group-consumer' , logger : { logLevel : 1 } });

Send log messages to Logstash server(s) via UDP:

var consumer = new Kafka.GroupConsumer({ clientId : 'group-consumer' , logger : { logstash : { enabled : true , connectionString : '10.0.1.1:9999,10.0.1.2:9999' , app : 'myApp-kafka-consumer' } } });

You can overwrite the function that outputs messages to stdout/stderr:

var consumer = new Kafka.GroupConsumer({ clientId : 'group-consumer' , logger : { logFunction : console .log } });

Topic Creation