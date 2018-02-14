Coworkers is a RabbitMQ microservice framework

Coworkers is a new microservice framework heavily inspired by Koa, which aims to be a simple, robust, and scalable foundation for creating RabbitMQ microservices. Through leveraging generators Coworkers allows you to ditch callbacks and greatly increase error-handling. Coworkers also uses Amqplib, a battle-tested AMQP client, under the hood to communicate with RabbitMQ and has best-practices baked in. Finally, Coworkers enables easy scalability by running each queue-consumer in its own process through node clustering (optional).

Installation

npm install --save coworkers

Note: amqplib is a peer dependency. This give you flexibility in using any compatible version you please. npm@^3 does not install peer dependencies automatically, so you will have to install amqplib yourself.

Usage

Quick Example

const coworkers = require ( 'coworkers' ) const app = coworkers() app.use( function * ( next ) { yield next }) app.queue( 'foo-queue' , function * ( ) { this .ack = true }) app.on( 'error' , function ( err ) { console .error(err.stack) }) app.connect()

Documentation

Application

The Coworkers Application class is the center of RabbitMQ microservice. It keeps track of which queues to consume and generator middlewares. Coworker's middlewares system should feel familiar as it is similar to that of many other http frameworks such as Ruby's Rack, Connect, and is actually powered by the internals of Koa. This middleware system uses generators to make handling asyncronous behavior a breeze. Coworkers is also powered by amqplib , a battle tested RabbitMQ client library, and has many best practices built in.

Methods (documented below):

use

prefetch

queue

connect

close

Simple consumer example:

const coworkers = require ( 'coworkers' ) const app = coworkers() app.queue( 'foo-queue' , function * ( ) { this .ack = true }) app.on( 'error' , function ( err, channel, context ) { console .error(err) if (channel) { channel.nack(context.message).catch( function ( err ) { console .error(err) }) } }) app.connect()

Use the given middleware for all queues consumed by the app.

See "Cascading middleware" section (below) for a full example

Setup prefetch options for the application's consumer channel ( consumerChannel ). Equivalent to http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch.

app.prefetch example

const coworkers = require ( 'coworkers' ) const app = coworkers() app.prefetch( 100 , false ) app.queue( 'foo-queue' , function * ( ) { this .ack = true }) app.on( 'error' , ) app.connect()

Setup a queue consumer w/ options and middleware. Queues will be asserted and consumed with the given options in app.connect .

app.queue example

const coworkers = require ( 'coworkers' ) const app = coworkers({ schema : schema }) app.on( 'error' , function ( err ) { console .error(err.stack) }) const queueOpts = { } const consumeOpts = { } app.queue( 'queue0' , consumeOpts, function * ( ) {}) app.queue( 'queue0' , queueOpts, consumeOpts, function * ( ) {}) app.queue( 'queue1' , queueOpts, consumeOpts, function * ( ) {})

app.queue example when using rabbitmq-schema

const coworkers = require ( 'coworkers' ) const RabbitSchema = require ( 'rabbitmq-schema' ) const schema = new RabbitSchema({ exchange : 'exchange0' , type : 'direct' , options : {}, bindings : { routingPattern : 'foo.bar.key' , destination : { queue : 'queue0' , messageSchema : { }, options : { } }, args : {} } }) const app = coworkers(schema) app.on( 'error' , function ( err ) { console .error(err.stack) }) const consumeOpts = { } app.queue( 'queue0' , consumeOpts, function * ( ) {}) const queueOpts = { } app.queue( 'queue0' , queueOpts, consumeOpts, function * ( ) {}) app.queue( 'queue1' , consumeOpts, function * ( ) {})

See "Cascading middleware" section (below) for a full example

Cascading Middleware

Coworker's middleware cascades in a more traditional way as you may be used to with similar tools - this was previously difficult to make user friendly with node's use of callbacks. However with generators we can achieve "true" middleware. Contrasting Connect's implementation which simply passes control through series of functions until one returns, Coworkers yields "downstream", then control flows back "upstream" just like Koa.

The following example ack 's all foo-queue messages. First the message flows through trace and parse-content middleware to mark when the request started, parse content, and then yield control through to the foo-queue consumer middleware. When an middleware invokes yield next the function suspends and passes control to the next middleware defined. After there are no more middleware to execute "downstream", the stack will unwind and each middleware is resumed to perform "upstream" behavior (post yield , in reverse order). Note: "shared middlewares" (use) always run before "consumer middlewares" (consume), regardless of attachment order.

Note: if the message reaches the end of the middlewares without an ack (and consumer is not noAck ) a special NoAckError will be thrown.

Cascading example:

const app = require ( 'coworkers' )() app.use( function * ( next ) { this .id = require ( 'crypto' ).randomBytes( 12 ) const startTime = Date .now() yield next const elapsed = Date .now() - startTime console .log( `coworkers-trace: ${ this .id} : ${elapsed} ` ) }) app.use( function * ( next ) { this .message.content = JSON .parse( this .message.content) yield next }) app.queue( 'foo-queue' , function * ( ) { this .ack = true }) app.connect()

By default, app handles all errors by logging them and nacking messages

Middleware Error Handling

A coworkers application will not start w/out an error handler. Middleware errors are emitted on the app. To setup error-handling logic such as centralized logging you can add an "error" event listener.

Simple error handler example:

app.on( 'error' , function ( err, context ) { log.error( ` ${context.queueName} consumer error` , err) })

For special error-handling behavior make use of the properties available on context . Also, make sure to handle errors that can occur in the error handler (they will not be caught).

Robust error handler example:

app.on( 'error' , function ( err, context ) { log.error( ` ${context.queueName} consumer error` , err) const hasDlx = Boolean (context.queueOpts.deadLetterExchange) if (!context.messageAcked && hasDlx) { let channel = context.consumerChannel let message = context.message let requeue = false channel.nack(message, false , requeue) } })

The recommended namespace to extend with information that's useful throughout the lifetime of your application, as opposed to a per request basis.

app.context.db = db();

Connect to RabbitMQ, create channels, and consume queues

1) Creates a connection to rabbitmq

2) Creates a consumer channel and publisher channel

3) Begins consuming queues

Note on Clustering: If using clustering and isMaster, it will create all workers and wait for them to connect to RabbitMQ. If any of the workers fail to connect to Rabbitmq they will cause master's connect to error.

Successful connect examples:

const app = require ( 'coworkers' )() app.queue( 'foo-queue' , function * ( ) { }) app.connect() .then(...) .catch(...) app.connect( 'amqp://127.0.0.1:8000' ) .then(...) .catch(...) const socketOptions = {} app.connect( 'amqp://127.0.0.1:8000' , socketOptions) .then(...) .catch(...) app.connect(callback) app.connect( 'amqp://127.0.0.1:8000' , callback) const socketOptions = {} app.connect( 'amqp://127.0.0.1:8000' , socketOptions, callback) function callback ( err ) { }

Failed connect examples:

const app = require ( 'coworkers' )() app.use( function * ( ) { }) app.connect( function ( err ) { })

Close channels and disconnect from RabbitMQ

Close examples:

app.close() .then(...) .catch(...) app.close(callback)

Context

A Coworkers Context encapsulates a RabbitMQ consumer's message and channel s into a single object. This provides easy access to methods and accessors to data frequently used w/ RabbitMQ microservice development.

A Context is created per message, and is referenced in middleware as the receiver, or the this identifier

Context example:

app.use( function * ( ) { this this .queueName this .message this .consumerChannel this .publisherChannel })

Many of the context's accessors and methods simply delegate to their this.message , this.consumerChannel , or this.publisherChannel equivalents for convenience, and are otherwise identical. For example, this.deliveryTag and this.messageAcked delegate to the message , this.ack and this.nack delegate to consumerChannel , and this.publish(...) and this.sendToQueue(...) delegate to publisherChannel .

Context Models

For the most part context models should not need to be used. Context accessor and methods should be more convenient, and allow the message to properly flow "downstream" and "upstream".

this.app - the coworkers app

this.connection - amqplib rabbitmq connection, see "Connection" documentation below

this.consumerChannel - amqplib* rabbitmq channel dedicated to consuming, see "Channel" documentation below

this.publisherChannel - amqplib* rabbitmq channel dedicated to publishing, see "Channel" documentation below

Context Properties

this.queueName - name of the queue from which the message originated

this.message - the incoming rabbitmq message

this.content - message content buffer, message.content accessor

accessor this.fields - message fields, message.fields getter

getter this.properties - message properties, message.properties getter

getter this.headers - message headers, message.properties.headers getter

getter this.exchange - exchange which the message was published to, message.fields.exchange accessor

accessor this.routingKey - routingKey which the message was published with, message.fields.routingKey accessor

accessor this.deliveryTag - delivery tag of the message, message.fields.deliveryTag getter

getter this.consumerTag - unique identifier of consumer, message.fields.consumerTag getter

getter this.redelivered - whether message was redelivered, message.fields.redelivered getter

getter this.queueOpts - queue options used to assert the queue

this.consumeOpts - queue's consume options

this.messageAcked - boolean, whether message has been acknowledged (ack, nack, reject)

this.state - recommended namespace for passing info between middlewares

Context Ack Properties

These special ack properties should be used in place of channel calls (except in the error handler). These properties allow the message to complete it's flow "downstream" and back "upstream" before acknowledging the message.

this.ack - set this property to ack the message at the end of the middlewares

this.nack - set this property to nack the message at the end of the middlewares

this.reject - set this property to reject the message at the end of the middlewares

Ack Example:

app.use( function * ( ) { this .ack = true this .ack = { allUpTo : true } })

Nack Example:

app.use( function * ( ) { this .nack = true this .nack = { requeue : false , allUpTo : false } })

Context Methods

this.publish(...) - publish a message to an exchange w/ a routing key on the publisherChannel

this.sendToQueue(...) - publish a message directly to a queue on the publisherChannel

this.request(...) - publish an rpc message, and easily recieve it's reply, creates a new channel for publishing and consuming (uses amqplib-rpc

this.reply(...) - reply to an rpc message on the publisherChannel (uses amqplib-rpc

this.checkQueue(...) - check if a queue exists (creates it's own channel to prevent any unexpected errors)

this.checkReplyQueue() - check if a reply-queue exists using message.properties.replyTo (creates it's own channel to prevent any unexpected errors)

this.toJSON() - return json version of context (note: will not jsonify context.state, if it includes non-primitives)

Publish example:

app.use( function * ( next ) { const content = { foo : 1 } const opts = {} this .publish( 'exchange-name' , 'routing.key' , content, opts) })

SendToQueue and CheckQueue example:

app.use( function * ( next ) { const content = 'hello' const opts = {} var exists = yield this .checkQueue( 'queue-name' ) if (!exists) { return } this .sendToQueue( 'queue-name' , content, opts) })

RPC ( Request, Reply, CheckReplyQueue) example:

Client.js using context.request

app.queue( 'client-queue' , function * ( ) { const content = { a : 10 , b : 20 } const replyMsg = yield this .request( 'multiply-queue' , content) console .log(replyMsg.content.toString()) this .ack = true })

Server.js using context.reply

app.use( function * ( next ) { this .message.content = JSON .stringify( this .message.content.toString()) yield next }) app.queue( 'multiply-queue' , function * ( ) { const exists = yield this .checkReplyQueue() if (!exists) { return } const content = this .message.content const a = content.a const b = content.b const c = a * b const opts = {} this .reply( new Buffer(c), opts) })

Channel

see amqplib channel documentation http://www.squaremobius.net/amqp.node/channel_api.html#channel

Connection

see amqplib connection documentation http://www.squaremobius.net/amqp.node/channel_api.html#connect

Clustering / Process management

By default, coworkers will use clustering to give each queue consumer it's own process. Clustering is optional, you can manage coworker processes manually (see "Manual process management" below).

Clustering is opinionated, it make the processes work as a unit:

If a worker fails to startup and connect to rabbitmq, it will kill all the workers

If a workers dies it will be attempted to be respawned w/ exponential backoff Use the following ENV variables to adjust behavior: COWORKERS_RESPAWN_RETRY_ATTEMPTS , COWORKERS_RESPAWN_RETRY_MIN_TIMEOUT , COWORKERS_RESPAWN_RETRY_FACTOR

If a worker dies and repeatedly fails to create process and connect to rabbitmq it will crash the entire cluster

Clustering example:

When clustering is enabled, Coworkers will optimize the number of processes to the number of cpus the server has. The below example will will create four workers in total (to match the number of cpus): two "foo-queue" consumers, and two "bar-queue" consumers. If the number of queues > num cpus, coworkers will only create one consumer per queue. If you want to specify the number of workers per queue you can do this using the environment variable: COWORKERS_WORKERS_PER_QUEUE . If you have any problems w/ a particular worker process you can close it by sending it a SIGINT signal, this will gracefully shutdown the process and not respawn a replacement (to restart the worker after stopping it, restart your coworkers app).

const app = require ( 'coworkers' )() require ( 'os' ).cpus().length app.queue( 'foo-queue' , ...) app.queue( 'bar-queue' , ...) app.on( 'error' , ...) app.connect( function ( err ) { if (err) console .error(err.stack) })

Manual process management:

Coworkers forces you to only consume a single queue per process, so that your consumers are decoupled. If you want to manage your own processes w/out using clustering all you have to do is specify three environment variables:

Processes will send process messages so that you can determine the state:

messages include: 'coworkers:connect', 'coworkers:connect:error', 'coworkers:close', 'coworkers:close:error'

COWORKERS_CLUSTER= "false" COWORKERS_QUEUE= "foo-queue" COWORKERS_QUEUE_WORKER_NUM=1

app.use( 'foo-queue' , ...) app.use( 'bar-queue' , ...) module .exports = app

var app = require ( 'app' )() app.queueNames.forEach( function ( queueName ) { })

RPC utils

If you need to publish rpc messages from another application you can use amqplib-rpc. Coworkers uses amqplib-rpc under the hood for it's RPC methods, so the method signatures are nearly identical.

Testing

Check out coworkers-test it allows you to easily test a coworkers app's message-handling middlewares as a unit w/out requiring rabbitmq.

Changelog

