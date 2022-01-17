More NestJS libs on alariblog.ru
This library will take care of RPC requests and messaging between microservices. It is easy to bind to our existing controllers to RMQ routes. This version is only for NestJS.
Updated for NestJS 8!
forTest() method for emulating messages in unit or e2e tests without needing of RabbitMQ instance.
First, install the package:
npm i nestjs-rmq
Setup your connection in root module:
import { RMQModule } from 'nestjs-rmq';
@Module({
imports: [
RMQModule.forRoot({
exchangeName: configService.get('AMQP_EXCHANGE'),
connections: [
{
login: configService.get('AMQP_LOGIN'),
password: configService.get('AMQP_PASSWORD'),
host: configService.get('AMQP_HOST'),
},
],
}),
],
})
export class AppModule {}
In forRoot() you pass connection options:
Additionally, you can use optional parameters:
{
exchangeName: 'my_exchange',
connections: [
{
login: 'admin',
password: 'admin',
host: 'localhost',
},
],
queueName: 'my-service-queue',
}
LoggerService interface. Compatible with Winston and other loggers.
RMQPipeClass with one method
transform. They will be triggered right after recieving message, before pipes and controller method. Trigger order is equal to array order.
errorHandler in module options and pass class that extends
RMQErrorHandler.
true.
class LogMiddleware extends RMQPipeClass {
async transfrom(msg: Message): Promise<Message> {
console.log(msg);
return msg;
}
}
RMQIntercepterClass with one method
intercept. They will be triggered before replying on any message. Trigger order is equal to array order.
export class MyIntercepter extends RMQIntercepterClass {
async intercept(res: any, msg: Message, error: Error): Promise<any> {
// res - response body
// msg - initial message we are replying to
// error - error if exists or null
return res;
}
}
Config example with middleware and intercepters:
import { RMQModule } from 'nestjs-rmq';
@Module({
imports: [
RMQModule.forRoot({
exchangeName: configService.get('AMQP_EXCHANGE'),
connections: [
{
login: configService.get('AMQP_LOGIN'),
password: configService.get('AMQP_PASSWORD'),
host: configService.get('AMQP_HOST'),
},
],
middleware: [LogMiddleware],
intercepters: [MyIntercepter],
}),
],
})
export class AppModule {}
If you want to inject dependency into RMQ initialization like Configuration service, use
forRootAsync:
import { RMQModule } from 'nestjs-rmq';
import { ConfigModule } from './config/config.module';
import { ConfigService } from './config/config.service';
@Module({
imports: [
RMQModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => {
return {
exchangeName: 'test',
connections: [
{
login: 'guest',
password: 'guest',
host: configService.getHost(),
},
],
queueName: 'test',
};
},
}),
],
})
export class AppModule {}
IRMQServiceOptions.
To send message with RPC topic use send() method in your controller or service:
@Injectable()
export class ProxyUpdaterService {
constructor(private readonly rmqService: RMQService) {}
myMethod() {
this.rmqService.send<number[], number>('sum.rpc', [1, 2, 3]);
}
}
This method returns a Promise. First type - is a type you send, and the second - you recive.
this.rmqService.send<number[], number>('sum.rpc', [1, 2, 3])
.then(reply => {
//...
})
.catch(error: RMQError => {
//...
});
Also you can use send options:
this.rmqService.send<number[], number>('sum.rpc', [1, 2, 3], {
expiration: 1000,
priority: 1,
persistent: true,
timeout: 30000,
});
If you want to just notify services:
const a = this.rmqService.notify<string>('info.none', 'My data');
This method returns a Promise.
To listen for messages bind your controller or service methods to subscription topics with RMQRoute() decorator:
export class AppController {
//...
@RMQRoute('sum.rpc')
sum(numbers: number[]): number {
return numbers.reduce((a, b) => a + b, 0);
}
@RMQRoute('info.none')
info(data: string) {
console.log(data);
}
}
Return value will be send back as a reply in RPC topic. In 'sum.rpc' example it will send sum of array values. And sender will get
6:
this.rmqService.send('sum.rpc', [1, 2, 3]).then((reply) => {
// reply: 6
});
Each '@RMQRoute' topic will be automatically bound to queue specified in 'queueName' option. If you want to return an Error just throw it in your method. To set '-x-status-code' use custom RMQError class.
@RMQRoute('my.rpc')
myMethod(numbers: number[]): number {
//...
throw new RMQError('Error message', 2);
throw new Error('Error message');
//...
}
With exchange type
topic you can use message patterns to subscribe to messages that corresponds to that pattern. You can use special symbols:
* - (star) can substitute for exactly one word.
#- (hash) can substitute for zero or more words.
For example:
*.*.rpc will match
my.own.rpc or
any.other.rpc and will not match
this.is.cool.rpc or
my.rpc.
compute.# will match
compute.this.equation.rpc and will not
do.compute.anything.
To subscribe to pattern, use it as route:
import { RMQRoute } from 'nestjs-rmq';
@RMQRoute('*.*.rpc')
myMethod(): number {
// ...
}
Note: If two routes patterns matches message topic, only the first will be used.
To get more information from message (not just content) you can use
@RMQMessage parameter decorator:
import { RMQRoute, Validate, RMQMessage, ExtendedMessage } from 'nestjs-rmq';
@RMQRoute('my.rpc')
myMethod(data: myClass, @RMQMessage msg: ExtendedMessage): number {
// ...
}
You can get all message properties that RMQ gets. Example:
{
"fields": {
"consumerTag": "amq.ctag-1CtiEOM8ioNFv-bzbOIrGg",
"deliveryTag": 2,
"redelivered": false,
"exchange": "test",
"routingKey": "appid.rpc"
},
"properties": {
"contentType": "undefined",
"contentEncoding": "undefined",
"headers": {},
"deliveryMode": "undefined",
"priority": "undefined",
"correlationId": "ce7df8c5-913c-2808-c6c2-e57cfaba0296",
"replyTo": "amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOTE4N2MzYWMyM2M0AAAenQAAAAAD.bDT8S9ZIl5o3TGjByqeh5g==",
"expiration": "undefined",
"messageId": "undefined",
"timestamp": "undefined",
"type": "undefined",
"userId": "undefined",
"appId": "test-service",
"clusterId": "undefined"
},
"content": "<Buffer 6e 75 6c 6c>"
}
To configure certificates and learn why do you need it, read here.
To use
amqps connection:
RMQModule.forRoot({
exchangeName: 'test',
connections: [
{
protocol: RMQ_PROTOCOL.AMQPS, // new
login: 'admin',
password: 'admin',
host: 'localhost',
},
],
connectionOptions: {
cert: fs.readFileSync('clientcert.pem'),
key: fs.readFileSync('clientkey.pem'),
passphrase: 'MySecretPassword',
ca: [fs.readFileSync('cacert.pem')]
} // new
}),
This is the basic example with reading files, but you can do however you want.
cert,
key and
ca must be Buffers. Notice:
ca is array. If you don't need keys, just use
RMQ_PROTOCOL.AMQPS protocol.
To use it with
pkcs12 files:
connectionOptions: {
pfx: fs.readFileSync('clientcertkey.p12'),
passphrase: 'MySecretPassword',
ca: [fs.readFileSync('cacert.pem')]
},
If you want to use your own ack/nack logic, you can set manual acknowledgement to
@RMQRoute. Than in any place you have to manually ack/nack message that you get with
@RMQMessage.
import { RMQRoute, Validate, RMQMessage, ExtendedMessage, RMQService } from 'nestjs-rmq';
@Controller()
export class MyController {
constructor(private readonly rmqService: RMQService) {}
@RMQRoute('my.rpc', { manualAck: true })
myMethod(data: myClass, @RMQMessage msg: ExtendedMessage): number {
// Any logic goes here
this.rmqService.ack(msg);
// Any logic goes here
}
@RMQRoute('my.other-rpc', { manualAck: true })
myOtherMethod(data: myClass, @RMQMessage msg: ExtendedMessage): number {
// Any logic goes here
this.rmqService.nack(msg);
// Any logic goes here
}
}
ExtendedMessage has additional method to get all data from message to debug it. Also it serializes content and hides Buffers, because they can be massive. Then you can put all your debug info into Error or log it.
import { RMQRoute, Validate, RMQMessage, ExtendedMessage, RMQService } from 'nestjs-rmq';
@Controller()
export class MyController {
constructor(private readonly rmqService: RMQService) {}
@RMQRoute('my.rpc')
myMethod(data: myClass, @RMQMessage msg: ExtendedMessage): number {
// ...
console.log(msg.getDebugString());
// ...
}
}
You will get info about message, field and properties:
{
"fields": {
"consumerTag": "amq.ctag-Q-l8A4Oh76cUkIKbHWNZzA",
"deliveryTag": 4,
"redelivered": false,
"exchange": "test",
"routingKey": "debug.rpc"
},
"properties": {
"headers": {},
"correlationId": "388236ad-6f01-3de5-975d-f9665b73de33",
"replyTo": "amq.rabbitmq.reply-to.g1hkABNyYWJiaXRANzQwNDVlYWQ5ZTgwAAAG2AAAAABfmnkW.9X12ySrcM6BOXpGXKkR+Yg==",
"timestamp": 1603959908996,
"appId": "test-service"
},
"message": {
"prop1": [1],
"prop2": "Buffer - length 11"
}
}
@RMQRoute handlers accepts a single parameter
msg which is a ampq
message.content parsed as a JSON. You may want to add additional custom layer to that message and change the way handler is called. For example, you may want to structure your message with two different parts: payload (containing actual data) and appId (containing request applicationId) and process them explicitly in your handler.
To do that, you may pass a param to the
RMQRoute a custom message factory
msgFactory?: (msg: Message) => any;.
The default msgFactory:
@RMQRoute('topic', {
msgFactory: (msg: Message) => JSON.parse(msg.content.toString())
})
Custom msgFactory that returns additional argument (sender appId) and change request:
@RMQRoute(CustomMessageFactoryContracts.topic, {
msgFactory: (msg: Message) => {
const content: CustomMessageFactoryContracts.Request = JSON.parse(msg.content.toString());
content.num = content.num * 2;
return [content, msg.properties.appId];
}
})
customMessageFactory({ num }: CustomMessageFactoryContracts.Request, appId: string): CustomMessageFactoryContracts.Response {
return { num, appId };
}
NestJS-rmq uses class-validator to validate incoming data. To use it, decorate your route method with
RMQValidate:
import { RMQRoute, RMQValidate } from 'nestjs-rmq';
@RMQValidate()
@RMQRoute('my.rpc')
myMethod(data: myClass): number {
// ...
}
Where
myClass is data class with validation decorators:
import { IsString, MinLength, IsNumber } from 'class-validator';
export class myClass {
@MinLength(2)
@IsString()
name: string;
@IsNumber()
age: string;
}
If your input data will be invalid, the library will send back an error without even entering your method. This will prevent you from manually validating your data inside route. You can check all available validators here.
NestJS-rmq uses class-transformer to transform incoming data. To use it, decorate your route method with
RMQTransform:
import { RMQRoute, RMQTransform } from 'nestjs-rmq';
@RMQTransform()
@RMQValidate()
@RMQRoute('my.rpc')
myMethod(data: myClass): number {
// ...
}
Where
myClass is data class with transformation decorators:
import { Type } from 'class-transformer';
import { IsDate } from 'class-validator';
export class myClass {
@IsDate()
@Type(() => Date)
date: Date;
}
After this you can use
data.date in your controller as Date object and not a string. You can check class-validator docs here. You can use transformation and validation at the same time - first transformation will be applied and then validation.
To intercept any message to any route, you can use
@RMQPipe decorator:
import { RMQRoute, RMQPipe } from 'nestjs-rmq';
@RMQPipe(MyPipeClass)
@RMQRoute('my.rpc')
myMethod(numbers: number[]): number {
//...
}
where
MyPipeClass extends
RMQPipeClass with one method
transform:
class MyPipeClass extends RMQPipeClass {
async transfrom(msg: Message): Promise<Message> {
// do something
return msg;
}
}
If you want to use custom error handler for dealing with errors from replies, use
errorHandler in module options and pass class that extends
RMQErrorHandler:
class MyErrorHandler extends RMQErrorHandler {
public static handle(headers: IRmqErrorHeaders): Error | RMQError {
// do something
return new RMQError(
headers['-x-error'],
headers['-x-type'],
headers['-x-status-code'],
headers['-x-data'],
headers['-x-service'],
headers['-x-host']
);
}
}
RQMService provides additional method to check if you are still connected to RMQ. Although reconnection is automatic, you can provide wrong credentials and reconnection will not help. So to check connection for Docker healthCheck use:
const isConnected = this.rmqService.healthCheck();
If
isConnected equals
true, you are successfully connected.
If you want to close connection, for example, if you are using RMQ in testing tools, use
disconnect() method;
RMQ library supports using RMQ module in your test suites without needing RabbitMQ instance. To use library in tests, use
forTest method in module.
import { RMQTestService } from 'nestjs-rmq';
let rmqService: RMQTestService;
beforeAll(async () => {
const apiModule = await Test.createTestingModule({
imports: [
RMQModule.forTest({})
],
controllers: [MicroserviceController],
}).compile();
api = apiModule.createNestApplication();
await api.init();
rmqService = apiModule.get(RMQService);
});
You can pass any options you pass in normal
forRoot (except
errorHandler).
From module, you will get
rmqService which is similar to normal service, with two additional methods:
triggerRoute - trigger your RMQRoute, simulating incoming message.
mockReply - mock reply if you are using
send method.
mockError - mock error if you are using
send method.
Emulates message received buy your RMQRoute.
const { result } = await rmqService.triggerRoute<Request, Response>(topic, data);
topic - topic, that you want to trigger (pattern supported).
data - data to send in your method.
If your service needs to send data to other microservice, you can emulate its reply with:
rmqService.mockReply(topic, res);
topic - all messages sent to this topic will be mocked.
res - mocked response data.
After this, all
rmqService.send(topic, { ... }) calls will return
res data.
If your service needs to send data to other microservice, you can emulate its error with:
rmqService.mockError(topic, error);
topic - all messages sent to this topic will be mocked.
error - error that
send method will throw.
After this, all
rmqService.send(topic, { ... }) calls will throw
error.
For e2e tests you need to install Docker in your machine and start RabbitMQ docker image with
docker-compose.yml in
e2e folder:
Then change IP in tests to
localhost and run tests with:
npm run test:e2e
For unit tests just run:
npm run test
New version of nestjs-rmq contains minor breaking changes, and is simple to migrate to.
@RMQController decorator is deprecated.
You will get warning if you continue to use it, and it will be deleted in future versions.
You can safely remove it from a controller or service.
msgFactory inside options will not be functional anymore. You have to move it to
@RMQRoute
msgFactory changed its interface from
msgFactory?: (msg: Message, topic: IRouteMeta) => any[];
to
msgFactory?: (msg: Message) => any[];
because all
IRouteMeta already contained in
Message.
msgFactory can be passed to
@RMQRoute instead of
@RMQController