Helper to simply implement a worker RSMQ ( Redis Simple Message Queue ).

⚠️ Note: RSMQ uses the Redis EVAL command (LUA scripts) so the minimum Redis version is 2.6+ .

Install

npm install rsmq-worker

Initialize

new RSMQWorker( queuename, options );

Example:

var RSMQWorker = require ( "rsmq-worker" ); var worker = new RSMQWorker( "myqueue" ); worker.on( "message" , function ( msg, next, id ) { console .log( "Message id : " + id); console .log(msg); next() }); worker.on( 'error' , function ( err, msg ) { console .log( "ERROR" , err, msg.id ); }); worker.on( 'exceeded' , function ( msg ) { console .log( "EXCEEDED" , msg.id ); }); worker.on( 'timeout' , function ( msg ) { console .log( "TIMEOUT" , msg.id, msg.rc ); }); worker.start();

Config

queuename : ( String required ) The queuename to pull the messages

: ( required ) The queuename to pull the messages options ( Object optional ) The configuration object options.interval : ( Number[] optional; default = [ 0, 1, 5, 10 ] ) An Array of increasing wait times in seconds. More details options.maxReceiveCount : ( Number optional; default = 10 ) Receive count until a message will be exceeded options.invisibletime : ( Number optional; default = 30 ) A time in seconds to hide a message after it has been received. options.defaultDelay : ( Number optional; default = 1 ) The default delay in seconds for for sending new messages to the queue. options.autostart : ( Boolean optional; default = false ) Autostart the worker on init options.timeout : ( Number optional; default = 3000 ) Message processing timeout in ms . So you have to call the next() method of message at least after e.g. 3000ms. If set to 0 it'll wait until infinity. options.customExceedCheck : ( Function optional; ) A custom function, with the raw message (see message format) as argument to build a custom exceed check. If you return a true the message will not exceed. On return false the regular check for maxReceiveCount will be used. options.alwaysLogErrors : ( Boolean optional; default = false ) An error will be logged to the console even if an error listener has been attached. options.rsmq : ( RedisSMQ optional; default = null ) A already existing rsmq instance to use instead of creating a new client options.redis : ( RedisClient optional; default = null ) A already existing redis client instance to use if no rsmq instance has been defined options.redisPrefix : ( String optional; default = "" ) The redis prefix/namespace for rsmq if no rsmq instance has been defined. This has to match the option ns of RSMQ. options.host : ( String optional; default = "localhost" ) Host to connect to redis if no rsmq or redis instance has been defined options.port : ( Number optional; default = 6379 ) Port to connect to redis if no rsmq or redis instance has been defined options.options : ( Object optional; default = {} ) Options to connect to redis if no rsmq or redis instance has been defined

( optional ) The configuration object

Raw message format

A message ( e.g. received by the event data or customExceedCheck ) contains the following keys:

msg.message : ( String ) The queue message content. You can use complex content by using a stringified JSON.

: ( ) The queue message content. You can use complex content by using a stringified JSON. msg.id : ( String ) The rsmq internal message id

: ( ) The rsmq internal message id msg.sent : ( Number ) Timestamp of when this message was sent / created.

: ( ) Timestamp of when this message was sent / created. msg.fr : ( Number ) Timestamp of when this message was first received.

: ( ) Timestamp of when this message was first received. msg.rc : ( Number ) Number of times this message was received.

Methods

If you haven't defined the config autostart to true you have to call the .start() method.

Return

( Self ): The instance itself for chaining.

Just stop the receive interval. This will not cut the connection to rsmq/redis. If you want you script to end call .quit()

Return

( Self ): The instance itself for chaining.

.send( msg [, delay ][, cb ] )

Helper function to simply send a message in the configured queue.

Arguments

msg : ( String required ): The rsmq message. In best practice it's a stringified JSON with additional data.

: ( required ): The rsmq message. In best practice it's a stringified JSON with additional data. delay : ( Number optional; default = 0 ): The message delay to hide this message for the next x seconds.

: ( optional; default = ): The message delay to hide this message for the next seconds. cb : ( Function optional ): An optional callback to get a secure response for a successful send.

Return

( Self ): The instance itself for chaining.

.del( id [, cb ] )

Helper function to simply delete a message after it has been processed.

Arguments

id : ( String required ): The rsmq message id.

: ( required ): The rsmq message id. cb : ( Function optional ): A optional callback to get a secure response for a successful delete.

Return

( Self ): The instance itself for chaining.

.changeInterval( interval )

Change the interval timeouts in operation.

Arguments

interval : ( Number|Array required ): The new interval.

Return

( Self ): The instance itself for chaining.

Stop the worker and close the connection. After this it's no longer possible to reuse the worker-instance. It's just intended to kill all timers and connections so your script will end.

.info( cb )

Get the current queue attributes. This is just a shortcut to the rsmq.getQueueAttributes .

Arguments

cb : ( Function ): Callback with ( err, attributes ) . See rsmq-docs for details.

.size( [hidden=false], cb )

Get the current queue size.

Arguments

hidden : ( Boolean optional; default = false ): The count of messages including the currently hidden/"in flight" messages.

: ( optional; default = ): The count of messages including the currently hidden/"in flight" messages. cb : ( Function optional ): Callback with ( err, size ) . The size is a Number and represents the number of messages in the queue. If hidden=true you will receive the number of currently hidden messages.

Return

( Self ): The instance itself for chaining.

Events

message

Main event to catch and process a message. If you do not set a handler for this Event nothing will happen.

Example:

worker.on( "message" , function ( message, next, msgid ) { next(); });

Arguments

message : ( String ) The queue message content to process. You can use complex content by using a stringified JSON.

: ( ) The queue message content to process. You can use complex content by using a stringified JSON. next : ( Function ) A function you have to call when your message has been processed.

Arguments delete : ( Boolean|Error optional; default = true ) Error : If you return an error it will emitted as an error event; Boolean : It's possible to prevent the worker from auto-delete the message on end. This is useful if you want to pop up a message multiple times. To implement this, please check the config options.customExceedCheck

: ( ) A function you have to call when your message has been processed. msgid : ( String ) The message id. This is useful if you want to delete a message manually.

ready

Fired until the worker is connected to rsmq/redis and has been initialized with the given queuename.

data

The raw event when a message has been received.

Arguments

msg : ( String ) The raw rsmq message. ( See section Raw message format )

deleted

Fired after a message has been deleted.

Arguments

id : ( String ) The rsmq message id

exceeded

Fired after a message has been exceeded and immediately will be deleted.

Arguments

msg : ( String ) The raw rsmq message. ( See section Raw message format )

timeout

Fired if a message processing exceeds the configured timeout.

Arguments

msg : ( String ) The raw rsmq message. ( See section Raw message format )

error

Fired if a message processing throws an error.

Arguments

err : ( Error|Any ) The thrown error

: ( ) The thrown error msg : ( String ) The raw rsmq message. ( See section Raw message format )

Advanced example

This is an advanced example showing some features in action.

var fs = require ( "fs" ); var RSMQWorker = require ( "rsmq-worker" ); var fnCheck = function ( msg ) { if ( msg.message === "createmessages" ){ return true } return false } var worker = new RSMQWorker( "myqueue" , { interval : [ .1 , 1 ], invisibletime : 2 , maxReceiveCount : 2 , autostart : true , customExceedCheck : fnCheck }); worker.on( 'error' , function ( err, msg ) { console .log( "ERROR" , err, msg.id ); }); worker.on( 'timeout' , function ( msg ) { console .log( "TIMEOUT" , msg.id, msg.rc ); }); var rsmq = worker._getRsmq(); worker.on( 'exceeded' , function ( msg ) { console .log( "EXCEEDED" , msg.id ); rsmq.sendMessage( "YOUR_EXCEEDED_QUEUE" , msq, function ( err, resp ) { if ( err ){ console .error( "write-to-exceeded-queue" , err ) } }); }); worker.on( "message" , function ( message, next, id ) { console .log( "message" , message ); if ( message === "createmessages" ){ next( false ) worker.send( JSON .stringify( { type : "writefile" , filename : "./test.txt" , txt : "Foo Bar" } ) ); worker.send( JSON .stringify( { type : "deletefile" , filename : "./test.txt" } ) ); return } var _data = JSON .parse( message ) switch ( _data.type ){ case "writefile" : fs.writeFile( _data.filename, _data.txt, function ( err ) { if ( err ){ next( err ); } else { next() } }); break ; case "deletefile" : fs.unlink( _data.filename, function ( err ) { if ( err ){ next( err ); } else { next() } }); break ; } }); worker.send( "createmessages" );

Details

Options interval

The option interval can:

A.) be a Number so the worker will poll the queue every n seconds (e.g. interval: .5 = twice a second second)

B.) be an Array of Numbers. On start interval[0] is the time to poll the queue. Everytime the worker receives an empty response (queue is empty) the next interval will be used to wait for the next poll ( interval[+1] ) until the last definition interval[ n ] was reached. On every received message the wait time will be reset to interval[0] .

E.g: interval: [ .2, 1, 3 ] 1st poll -> no message -> wait .2s = 200ms

= 2nd poll -> no message -> wait 1s

3rd poll -> no message -> wait 3s

4th poll -> no message -> wait 3s

5th poll -> 1 message -> wait .2s

6th poll -> no message -> wait 1s

7th poll -> 1 message -> wait .2s

8th poll -> 1 message -> wait .2s

9th poll -> no message -> wait .2s

10th poll -> no message -> wait 1s ...

Release History

Version Date Description 0.5.2 2016-10-24 Optimized README and updated dependencies 0.5.1 2016-08-22 Fixed reconnect error Issue#20. Thanks to mstduff; updated deps; removed generated code docs from repo 0.5.0 2016-07-14 Added methods .info(cb) (Issue#17) and .size( [hidden,] cb ) 0.4.3 2016-06-20 Optimized event listeners Issue#15. Thanks to Kevin Turner 0.4.2 2016-05-06 Added the .quit() function Issue#11. Thanks to Sam Fung 0.4.1 2016-04-05 Fixed missing isNumber function 0.4.0 2016-03-30 Updated dependencies (especially lodash to 4.x). Fixed a config bug caused by the array merge from extend Issue#7. Thanks to Peter Hanneman 0.3.8 2015-11-04 Fixed stop behavior. Pull#5. Thanks to Exinferis 0.3.7 2015-09-02 Added tests to check the behavior during errors within message processing; Added option alwaysLogErrors to prevent console logs if an error event handler was attached. Issue #3 0.3.6 2015-09-02 Updated dependencies; optimized readme (thanks to Tobias Lidskog for the pull #4) 0.3.5 2015-04-27 again ... fixed argument dispatch for .send() 0.3.4 2015-04-27 fixed argument dispatch for .send() and added optional cb for .del() 0.3.3 2015-03-27 added changeInterval to modify the interval in operation 0.3.2 2015-02-23 changed default prefix/namespace; 0.3.0 2015-02-16 It's now possible to return an error as first argument of next . This will lead to an error emit + optimized readme 0.2.2 2015-01-27 added option defaultDelay and optimized arguments of the send method; fixed travis.yml 0.2.0 2015-01-27 Added timeout, better error handling and send callback 0.1.2 2015-01-20 Reorganized code, added code docs and optimized readme 0.1.1 2015-01-17 Added test scripts and optimized repository file list 0.1.0 2015-01-16 First working and documented version 0.0.1 2015-01-14 Initial commit

Other projects

The MIT License (MIT)

Copyright © 2015 Mathias Peter, http://www.tcs.de

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.