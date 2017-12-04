Thin Amplib utils for RabbitMQ RPC in Node.js. Uses replyTo and correlationId message properties as the RabbitMQ rpc tutorial suggests.

Installation

npm i --save amqplib-rpc

Usage

request

Make an rpc request, publish a message to an rpc queue.

Creates a channel, queue, correlationId, and sets up properties.replyTo and properties.correlationId on request message.

request examples, rpc client

var amqplib = require ( 'amqplib' ) var request = require ( 'amqplib-rpc' ).request amqplib.connect().then( function ( connection ) { return request(connection, 'multiply-queue' , { a : 10 , b : 20 }).then( function ( replyMessage ) { console .log(replyMessage.content.toString()) }) }) .catch(...) var amqplib = require ( 'amqplib/callback_api' ) var request = require ( 'amqplib-rpc' ).request amqplib.connect( function ( err, connection ) { if (err) throw err var content = { a : 10 , b : 20 } request(connection, 'multiply-queue' , content, function ( err, replyMessage ) { if (err) throw err console .log(replyMessage.content.toString()) }) }) var amqplib = require ( 'amqplib' ) var request = require ( 'amqplib-rpc' ).request var TimeoutError = require ( 'amqplib-rpc' ).TimeoutError amqplib.connect( function ( err, connection ) { if (err) throw err var content = { a : 10 , b : 20 } var opts = { timeout : 100 } request(connection, 'multiply-queue' , content, opts, function ( err, replyMessage ) { console .log(err) console .log(err instanceof TimeoutError) console .log(err.data) }) }) var amqplib = require ( 'amqplib' ) var request = require ( 'amqplib-rpc' ).request var ChannelCloseError = require ( 'amqplib-rpc' ).ChannelCloseError amqplib.connect( function ( err, connection ) { if (err) throw err var content = { a : 10 , b : 20 } var opts = { timeout : 100 } request(connection, 'multiply-queue' , content, opts, function ( err, replyMessage ) { console .log(err) console .log(err instanceof ChannelCloseError) console .log(err.data) }) })

reply

Reply to an rpc request, publish a message to replyTo queue.

Replies to a message using properties.replyTo and properties.correlationId .

Reply will NOT error if the "replyTo" queue does not exist, if you need it to use checkReplyTo (example below).

Reply will NOT ack the message . Ack/Nack must be done manually.

reply example, rpc server

var amqplib = require ( 'amqplib/callback_api' ) var reply = require ( 'amqplib-rpc' ).reply amqplib.connect( function ( err, connection ) { if (err) throw err connection.createChannel( function ( err, consumerChannel ) { if (err) throw err connection.createChannel( function ( err, publisherChannel ) { if (err) throw err consumerChannel.consume( 'multiply-queue' , messageHandler, function ( err ) { if (err) throw err }) function messageHandler ( message ) { var json = JSON .parse(message.content.toString()) var content = json.a * json.b var opts = {} reply(publisherChannel, message, content, opts) consumerChannel.ack(message) } }) }) })

checkReplyQueue

Create a channel, check if replyTo queue exists, and close the channel. checkReplyQueue creates it's own channel, bc checking for a non-existant queue errors and closes the channel.

checkReplyQueue before reply example, rpc server

var amqplib = require ( 'amqplib/callback_api' ) var reply = require ( 'amqplib-rpc' ).reply var checkReplyQueue = require ( 'amqplib-rpc' ).checkReplyQueue var QueueNotFound = require ( 'amqplib-rpc' ).QueueNotFound amqplib.connect( function ( err, connection ) { if (err) throw err connection.createChannel( function ( err, consumerChannel ) { if (err) throw err connection.createChannel( function ( err, publisherChannel ) { if (err) throw err consumerChannel.consume( 'multiply-queue' , messageHandler, function ( err ) { if (err) throw err }) function messageHandler ( message ) { var json = JSON .parse(message.content.toString()) var content = json.a * json.b var opts = {} checkReplyQueue(connection, message, function ( err, exists ) { if (err) throw err if (!exists) { return } reply(publisherChannel, message, content, opts) consumerChannel.ack(message) }) } }) }) })

checkQueue

Create a channel, check if the queue exists, and close the channel. This is not rpc related but was implemented for checkReplyQueue , so I've exported it. In some cases, it may be useful to check if a queue exists before publishing to it.

checkReplyQueue before reply example, rpc server

var amqplib = require ( 'amqplib/callback_api' ) var reply = require ( 'amqplib-rpc' ).reply var checkQueue = require ( 'amqplib-rpc' ).checkQueue var QueueNotFound = require ( 'amqplib-rpc' ).QueueNotFound amqplib.connect( function ( err, connection ) { if (err) throw err connection.createChannel( function ( err, channel ) { if (err) throw err var queue = 'some-queue' checkQueue(connection, queue, function ( err, exists ) { if (err) throw err if (!exists) { return } channel.sendToQueue(queue, content) }) }) })

Follows RabbitMQ RPC tutorial

https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html

Changelog

CHANGELOG.md

License

MIT