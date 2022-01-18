Integration of KafkaJS with NestJS to build event driven microservices.
Import and add the
KafkaModule to the imports array of the module for which you would like to use Kafka.
Register the
KafkaModule synchronous with the
register() method:
@Module({
imports: [
KafkaModule.register([
{
name: 'HERO_SERVICE',
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
Register the
KafkaModule asynchronous with the
registerAsync() method:
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot(),
KafkaModule.registerAsync(['HERO_SERVICE'], {
useFactory: async (configService: ConfigService) => {
const broker = this.configService.get('broker');
return [
{
name: 'HERO_SERVICE',
options: {
clientId: 'hero',
brokers: [broker],
},
consumer: {
groupId: 'hero-consumer'
}
}
}
];
},
inject: [ConfigService]
})
]
...
})
Full settings can be found:
|Config
|Options
|client
|https://kafka.js.org/docs/configuration
|consumer
|https://kafka.js.org/docs/consuming#options
|producer
|https://kafka.js.org/docs/producing#options
|serializer
|deserializer
|consumeFromBeginning
|true/false
Subscribing to a topic to accept messages.
export class Consumer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
onModuleInit(): void {
this.client.subscribeToResponseOf('hero.kill.dragon', this)
}
@SubscribeTo('hero.kill.dragon')
async getWorld(data: any, key: any, offset: number, timestamp: number, partition: number, headers: IHeaders): Promise<void> {
...
}
}
Send messages back to kafka.
const TOPIC_NAME = 'hero.kill.dragon';
export class Producer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
const result = await this.client.send({
topic: TOPIC_NAME,
messages: [
{
key: '1',
value: message
}
]
});
return result;
}
}
By default messages are converted to JSON objects were possible. If you're using
AVRO you can add the
SchemaRegistry deserializer to convert the messages. This uses the KafkaJS Schema-registry module
In your
module.ts:
@Module({
imports: [
KafkaModule.register([
{
name: 'HERO_SERVICE',
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
},
deserializer: new KafkaAvroResponseDeserializer({
host: 'http://localhost:8081'
}),
serializer: new KafkaAvroRequestSerializer({
config: {
host: 'http://localhost:8081/'
},
schemas: [
{
topic: 'test.topic',
key: join(__dirname, 'key-schema.avsc'),
value: join(__dirname, 'value-schema.avsc')
}
],
}),
},
]),
]
...
})
See the e2e test for example.
