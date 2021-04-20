// suggested Node.js version : v12 .16 .1 npm install

const {KafkaStreams} = require ( "kafka-streams" ); const config = require ( "./config.json" ); const factory = new KafkaStreams(config); const kstream = factory.getKStream( "input-topic" ); const ktable = factory.getKTable( ); kstream.merge(ktable).filter( ).map( ).reduce( ).to( "output-topic" );

CHANGES: The latest version brings a lot of changes, please check here before updating.

API Overview

You might also like

README Overview

Prerequisites

Kafka broker should be version >= 0.11.x

Node.js should be version >= 8.x.x

A note on native mode

If you are using the native mode ( config: { noptions: {} } ). You will have to manually install node-rdkafka alongside kafka-streams. (This requires a Node.js version between 9 and 12 and will not work with Node.js >= 13, last tested with 12.16.1)

On Mac OS High Sierra / Mojave: CPPFLAGS=-I/usr/local/opt/openssl/include LDFLAGS=-L/usr/local/opt/openssl/lib yarn add --frozen-lockfile node-rdkafka@2.7.4

Otherwise: yarn add --frozen-lockfile node-rdkafka@2.7.4

(Please also note: Doing this with npm does not work, it will remove your deps, npm i -g yarn )

Aim of this Library

this is not a 1:1 port of the official JAVA kafka-streams

the goal of this project is to give at least the same options to a nodejs developer that kafka-streams provides for JVM developers

stream-state processing, table representation, joins, aggregate etc. I am aiming for the easiest api access possible checkout the word count example

Description

kafka-streams 🐙 equivalent for nodejs ✨🐢🚀✨ build on super fast 🔥 observables using most.js :metal:

ships with sinek 🙏 for backpressure

comes with js and native Kafka client, for more performance and SSL, SASL and Kerberos features

the lib also comes with a few window operations that are more similar to Apache Flink, yet they still feel natural in this api :squirrel:

overwriteable local-storage solution allows for any kind of datastore e.g. RocksDB, Redis, Postgres..

async (Promises) and sync stream operators e.g. stream$.map() or stream$.asyncMap()

super easy API :goberserk:

the lib is based on sinek , which is based on kafka-node's ConsumerGroups

Port Progress Overview

core structure

core structure KStream base - stream as a changelog

KStream base - stream as a changelog KTable base - stream as a database

KTable base - stream as a database KStream & KTable cloning

KStream & KTable cloning complex stream join structure

complex stream join structure advanced joins see

advanced joins see windows (for joins) see

windows (for joins) see flink like window operations

flink like window operations word-count example

word-count example more examples

more examples local-storage for etl actions

local-storage for etl actions local-storage factory (one per action)

local-storage factory (one per action) KStorage example for any DB that supports atomic actions

KStorage example for any DB that supports atomic actions backing-up local-storage via kafka

backing-up local-storage via kafka kafka client implementation

kafka client implementation KTable replay to Kafka (produce)

KTable replay to Kafka (produce) stream for topic message production only

stream for topic message production only sinek implementation

sinek implementation backpressure mode for KafkaClient

backpressure mode for KafkaClient auto-json payloads (read-map/write-map)

auto-json payloads (read-map/write-map) auto producer partition and keyed-message handling

auto producer partition and keyed-message handling documentation

documentation API description

API description higher join & combine examples

higher join & combine examples embed native client librdkafka for more performance

embed native client for more performance SSL

SSL SASL

SASL Kerberos

Operator Implementations

map

map asyncMap

asyncMap constant

constant scan

scan timestamp

timestamp tap

tap filter

filter skipRepeats

skipRepeats skipRepeatsWith

skipRepeatsWith slice

slice take

take skip

skip takeWhile

takeWhile skipWhile

skipWhile until

until since

since reduce

reduce chainReduce

chainReduce forEach (observe)

forEach (observe) chainForEach

chainForEach drain

drain _zip

_zip _merge

_merge _join

_join _combine

_combine _sample

_sample throttle

throttle debounce

debounce delay

delay multicast

multicast A description of the operators can be found here

Missing an operator? Feel free to open an issue 👮‍♂️

Additional Operators

mapStringToArray

mapStringToArray mapArrayToKV

mapArrayToKV mapStringToKV

mapStringToKV mapParse

mapParse mapStringify

mapStringify atThroughput

atThroughput mapWrapKafkaPayload

mapWrapKafkaPayload mapToFormat

mapToFormat mapFromFormat

mapFromFormat Want more? Feel free to open an issue 👮‍♂️

Stream Action Implementations

countByKey

countByKey sumByKey

sumByKey min

min max

max Want more? Feel free to open an issue 👮‍♂️

Join Operations

Operation description

KStream Status

merge

merge outerJoin

outerJoin innerJoin

innerJoin leftJoin

leftJoin branch

KTable Status

merge

merge outerJoin

outerJoin innerJoin

innerJoin leftJoin

KTable <-> KStream Status

merge

merge outerJoin

outerJoin innerJoin

innerJoin leftJoin

Window Operations

KStream

window

window advanced window

advanced window rolling window

More

Can I use this library yet?

Yes.

Are we ready for production yet?

Probably, yes. 😄

Even More

Forks or Stars give motivation :bowtie: