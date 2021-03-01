Node.js bindings for librdkafka with Avro schema serialization.

The kafka-avro library is a wrapper that combines the node-rdkafka and avsc libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro.

Install

Install the module using NPM:

npm install kafka-avro --save

Documentation

The kafka-avro library operates in the following steps:

You provide your Kafka Brokers and Schema Registry (SR) Url to a new instance of kafka-avro. You initialize kafka-avro, that will tell the library to query the SR for all registered schemas, evaluate and store them in runtime memory. kafka-avro will then expose the getConsumer() and getProducer() methods, which both return instances of the corresponding Constructors from the node-rdkafka library.

The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer along with Confluent's Schema Registry Magic Byte and Schema Id.

You are highly encouraged to read the "node-rdkafka" documentation, as it explains how you can use the Producer and Consumer instances as well as check out the available configuration options of node-rdkafka.

node-rdkafka CODES

The Kafka.CODES enumeration of constant values provided by the "node-rdkafka" library is also available as a static var at:

const KafkaAvro = require ( 'kafka-avro' ); console .log(KafkaAvro.CODES);

Initialize kafka-avro

const KafkaAvro = require ( 'kafka-avro' ); const kafkaAvro = new KafkaAvro({ kafkaBroker : 'localhost:9092' , schemaRegistry : 'http://localhost:8081' , }); kafkaAvro.init() .then( function ( ) { console .log( 'Ready to use' ); });

Kafka-avro options

When instantiating kafka-avro you may pass the following options:

kafkaBroker String REQUIRED The url or comma delimited strings pointing to your kafka brokers.

The url or comma delimited strings pointing to your kafka brokers. schemaRegistry String REQUIRED The url to the Schema Registry.

The url to the Schema Registry. schemaRegistryAuth Object Basic auth object to connect to confluent cloud registry {username: API_KEY, password: API_SECRET} . Same as Axios basic auth Request Config parameter.

Basic auth object to connect to confluent cloud registry . Same as Axios basic auth Request Config parameter. topics Array of Strings You may optionally define specific topics to be fetched by kafka-avro vs fetching schemas for all the topics which is the default behavior.

You may optionally define specific topics to be fetched by kafka-avro vs fetching schemas for all the topics which is the default behavior. fetchAllVersions Boolean Set to true to fetch all versions for each topic, use it when updating of schemas is often in your environment.

Set to true to fetch all versions for each topic, use it when updating of schemas is often in your environment. fetchRefreshRate Number The pooling time (in seconds) to the schemas be fetched and updated in background. This is useful to keep with schemas changes in production. The default value is 0 seconds (disabled).

The pooling time (in seconds) to the schemas be fetched and updated in background. This is useful to keep with schemas changes in production. The default value is seconds (disabled). parseOptions Object Schema parse options to pass to avro.parse() . parseOptions.wrapUnions is set to true by default.

Schema parse options to pass to . is set to by default. httpsAgent Object initialized https Agent class

initialized https Agent class shouldFailWhenSchemaIsMissing Boolean Set to true if producing a message for which no AVRO schema can be found should throw an error

Set to true if producing a message for which no AVRO schema can be found should throw an error keySubjectStrategy String A SubjectNameStrategy for key. It is used by the Avro serializer to determine the subject name under which the event record schemas should be registered in the schema registry. The default is TopicNameStrategy. Allowed values are [TopicRecordNameStrategy, TopicNameStrategy, RecordNameStrategy]

A SubjectNameStrategy for key. It is used by the Avro serializer to determine the subject name under which the event record schemas should be registered in the schema registry. The default is TopicNameStrategy. Allowed values are [TopicRecordNameStrategy, TopicNameStrategy, RecordNameStrategy] valueSubjectStrategy String A SubjectNameStrategy for value. It is used by the Avro serializer to determine the subject name under which the event record schemas should be registered in the schema registry. The default is TopicNameStrategy. Allowed values are [TopicRecordNameStrategy, TopicNameStrategy, RecordNameStrategy]

A SubjectNameStrategy for value. It is used by the Avro serializer to determine the subject name under which the event record schemas should be registered in the schema registry. The default is TopicNameStrategy. Allowed values are [TopicRecordNameStrategy, TopicNameStrategy, RecordNameStrategy] isStringRegistryKey Boolean Set to true to not send requests for Avro schemas for keys. Set to false by default

Producer

NOTICE: You need to initialize kafka-avro before you can produce or consume messages.

By invoking the kafkaAvro.getProducer() method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved.

kafkaAvro.getProducer({ }) .then( function ( producer ) { const topicName = 'test' ; producer.on( 'disconnected' , function ( arg ) { console .log( 'producer disconnected. ' + JSON .stringify(arg)); }); const value = { name : 'John' }; const key = 'key' ; const partition = -1 ; producer.produce(topicName, partition, value, key); })

What kafka-avro basically does is wrap around node-rdkafka and intercept the produce method to validate and serialize the message.

Consumer

NOTICE: You need to initialize kafka-avro before you can produce or consume messages.

By invoking the kafkaAvro.getConsumer() method, kafka-avro will instantiate a Consumer, listen on log, error and disconnect events and return it to you. Depending on the consuming pattern you follow you may or may not need to perform a connect() .

Consumer using events to consume

When consuming topics using the data event you will need to perform a connect() as per node-rdkafka documentation:

kafkaAvro.getConsumer({ 'group.id' : 'librd-test' , 'socket.keepalive.enable' : true , 'enable.auto.commit' : true , }) .then( function ( consumer ) { return new Promise ( function ( resolve, reject ) { consumer.on( 'ready' , function ( ) { resolve(consumer); }); consumer.connect({}, function ( err ) { if (err) { reject(err); return ; } resolve(consumer); }); }); }) .then( function ( consumer ) { const topicName = 'test' ; consumer.subscribe([topicName]); consumer.consume(); consumer.on( 'data' , function ( rawData ) { console .log( 'data:' , rawData); }); });

Consumer using streams to consume

kafkaAvro.getConsumerStream({ 'group.id' : 'librd-test' , 'socket.keepalive.enable' : true , 'enable.auto.commit' : true , }, { 'request.required.acks' : 1 }, { 'topics' : 'test' }) .then( function ( stream ) { stream.on( 'error' , function ( err ) { if (err) console .log(err); process.exit( 1 ); }); stream.on( 'data' , function ( rawData ) { console .log( 'data:' , rawData) }); stream.on( 'error' , function ( err ) { console .log(err); process.exit( 1 ); }); stream.consumer.on( 'event.error' , function ( err ) { console .log(err); }) });

Same deal here, thin wrapper around node-rdkafka and deserialize incoming messages before they reach your consuming method.

Consumer Data Object

kafka-avro intercepts all incoming messages and augments the object with two more properties named parsed and parsedKey , which contained the avro deserialized object's value and key. Here is a breakdown of the properties included in the message object you receive when consuming messages:

value Buffer The raw message buffer from Kafka.

The raw message buffer from Kafka. size Number The size of the message.

The size of the message. key String|Number Partioning key used.

Partioning key used. topic String The topic this message comes from.

The topic this message comes from. offset Number The Kafka offset.

The Kafka offset. partition Number The kafka partion used.

The kafka partion used. parsed Object The avro deserialized value as a JS Object ltieral.

The avro deserialized value as a JS Object ltieral. schemaId Number The Registry Value Schema id of the consumed message.

The Registry Value Schema id of the consumed message. parsedKey Object The avro deserialized key as a JS Object ltieral.

The avro deserialized key as a JS Object ltieral. schemaIdKey Number The Registry Key Schema id of the consumed message.

The KafkaAvro instance also provides the following methods:

Support for several event types in the same topic

Kafka Avro can support several events types in the same topic. This requires using TopicRecordNameStrategy strategy.

const KafkaAvro = require ( 'kafka-avro' ); const kafkaAvro = new KafkaAvro({ kafkaBroker : 'localhost:9092' , schemaRegistry : 'http://localhost:8081' , keySubjectStrategy : "TopicRecordNameStrategy" , valueSubjectStrategy : "TopicRecordNameStrategy" , }); kafkaAvro.init() .then( function ( ) { console .log( 'Ready to use' ); });

You can read more about this here : https://www.confluent.io/blog/put-several-event-types-kafka-topic/

Using async/await

Using async/await

( async function ( ) { try { await kafkaAvro.init(); const producer = await kafkaAvro.getProducer({ }); producer.produce( 'test' , -1 , { name : 'John' }, 'key' ); } catch (err) { } })();

( async function ( ) { try { await kafkaAvro.init(); const consumer = await kafkaAvro.getConsumer({ }); consumer.on( 'ready' , function ( arg ) { consumer.subscribe([ 'topic' ]); consumer.consume(); }); consumer.on( 'data' , function ( rawData ) { console .log( 'data:' , rawData.parsed); }); consumer.on( 'disconnected' , function ( arg ) { console .log( 'consumer disconnected. ' + JSON .stringify(arg)); }); consumer.connect(); } catch (e) { } })();

Logging

The Kafka Avro library logs messages using the Bunyan logger. To enable logging you will have to define at least one of the needed ENV variables:

KAFKA_AVRO_LOG_LEVEL Set it a valid Bunyan log level value to activate console logging (Typically you'd need either info or debug as values.)

Set it a valid Bunyan log level value to activate console logging (Typically you'd need either or as values.) KAFKA_AVRO_LOG_NO_COLORS Set this to any value to disable color when logging.

WARNING The logger will not emit any messages as it was expected, there is an open issue on Bunyan's repository pending a solution on this. So no logging for now.

NOTICE This is a static method on the KafkaAvro constructor, not the instance. Therefore there is a single logger instance for the whole runtime.

Returns {Bunyan.Logger} Bunyan logger instance.

const KafkaAvro = require ( 'kafka-avro' ); const fmt = require ( 'bunyan-format' ); const kafkaLog = KafkaAvro.getLogger(); kafkaLog.addStream({ type : 'stream' , stream : fmt({ outputMode : 'short' , levelInString : true , }), level : 'info' , });

Read more about the bunyan-format package.

Helper Methods

Serialize the provided value with Avro, including the magic Byte and schema id provided.

Returns {Buffer} Serialized buffer message.

type {avsc.Type} The avro type instance.

{avsc.Type} The avro type instance. schemaId {number} The Schema Id in the SR.

{number} The Schema Id in the SR. value {*} Any value to serialize.

Deserialize the provided message, expects a message that includes Magic Byte and schema id.

Returns {*} Deserialized message.

type {avsc.Type} The avro type instance.

{avsc.Type} The avro type instance. message {Buffer} Message in byte code.

Testing

You can use docker-compose up to up all the stack before you call your integration tests with npm test . How the integration tests are outside the containers, you will need set you hosts file to :

127 .0 .0 .1 kafka schema-registry

Troubleshooting OSX Catalina

You can find some instructions here

Releasing

Update the changelog bellow. Ensure you are on master. Type: grunt release grunt release:minor for minor number jump.

for minor number jump. grunt release:major for major number jump.

