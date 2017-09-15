Helper to simply implement a worker RSMQ ( Redis Simple Message Queue ).
⚠️ Note: RSMQ uses the Redis
EVALcommand (LUA scripts) so the minimum Redis version is
2.6+.
npm install rsmq-worker
new RSMQWorker( queuename, options );
Example:
var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" );
worker.on( "message", function( msg, next, id ){
// process your message
console.log("Message id : " + id);
console.log(msg);
next()
});
// optional error listeners
worker.on('error', function( err, msg ){
console.log( "ERROR", err, msg.id );
});
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
});
worker.on('timeout', function( msg ){
console.log( "TIMEOUT", msg.id, msg.rc );
});
worker.start();
Config
String required ) The queuename to pull the messages
Object optional ) The configuration object
Number[] optional; default =
[ 0, 1, 5, 10 ] ) An Array of increasing wait times in seconds. More details
Number optional; default =
10 ) Receive count until a message will be exceeded
Number optional; default =
30 ) A time in seconds to hide a message after it has been received.
Number optional; default =
1 ) The default delay in seconds for for sending new messages to the queue.
Boolean optional; default =
false ) Autostart the worker on init
Number optional; default =
3000 ) Message processing timeout in
ms. So you have to call the
next() method of
message at least after e.g. 3000ms. If set to
0 it'll wait until infinity.
Function optional; ) A custom function, with the raw message (see message format) as argument to build a custom exceed check. If you return a
true the message will not exceed. On return
false the regular check for
maxReceiveCount will be used.
Boolean optional; default =
false ) An error will be logged to the console even if an error listener has been attached.
RedisSMQ optional; default =
null ) A already existing rsmq instance to use instead of creating a new client
RedisClient optional; default =
null ) A already existing redis client instance to use if no
rsmq instance has been defined
String optional; default =
"" ) The redis prefix/namespace for rsmq if no
rsmq instance has been defined. This has to match the option
ns of RSMQ.
String optional; default =
"localhost" ) Host to connect to redis if no
rsmq or
redis instance has been defined
Number optional; default =
6379 ) Port to connect to redis if no
rsmq or
redis instance has been defined
Object optional; default =
{} ) Options to connect to redis if no
rsmq or
redis instance has been defined
A message ( e.g. received by the event
data or
customExceedCheck ) contains the following keys:
String ) The queue message content. You can use complex content by using a stringified JSON.
String ) The rsmq internal message id
Number ) Timestamp of when this message was sent / created.
Number ) Timestamp of when this message was first received.
Number ) Number of times this message was received.
.start()
If you haven't defined the config
autostart to
true you have to call the
.start() method.
Return
( Self ): The instance itself for chaining.
.stop()
Just stop the receive interval.
This will not cut the connection to rsmq/redis.
If you want you script to end call
.quit()
Return
( Self ): The instance itself for chaining.
.send( msg [, delay ][, cb ] )
Helper function to simply send a message in the configured queue.
Arguments
msg : (
String required ): The rsmq message. In best practice it's a stringified JSON with additional data.
delay : (
Number optional; default =
0 ): The message delay to hide this message for the next
x seconds.
cb : (
Function optional ): An optional callback to get a secure response for a successful send.
Return
( Self ): The instance itself for chaining.
.del( id [, cb ] )
Helper function to simply delete a message after it has been processed.
Arguments
id : (
String required ): The rsmq message id.
cb : (
Function optional ): A optional callback to get a secure response for a successful delete.
Return
( Self ): The instance itself for chaining.
.changeInterval( interval )
Change the interval timeouts in operation.
Arguments
interval : (
Number|Array required ): The new interval.
Return
( Self ): The instance itself for chaining.
.quit()
Stop the worker and close the connection. After this it's no longer possible to reuse the worker-instance. It's just intended to kill all timers and connections so your script will end.
.info( cb )
Get the current queue attributes.
This is just a shortcut to the
rsmq.getQueueAttributes.
Arguments
cb : (
Function ): Callback with
( err, attributes ). See rsmq-docs for details.
.size( [hidden=false], cb )
Get the current queue size.
Arguments
hidden : (
Boolean optional; default =
false ): The count of messages including the currently hidden/"in flight" messages.
cb : (
Function optional ): Callback with
( err, size ). The
size is a
Number and represents the number of messages in the queue. If
hidden=true you will receive the number of currently hidden messages.
Return
( Self ): The instance itself for chaining.
message
Main event to catch and process a message. If you do not set a handler for this Event nothing will happen.
Example:
worker.on( "message", function( message, next, msgid ){
// process message ...
next();
});
Arguments
String ) The queue message content to process. You can use complex content by using a stringified JSON.
Function ) A function you have to call when your message has been processed.
delete: (
Boolean|Error optional; default = true )
Error: If you return an error it will emitted as an error event;
Boolean: It's possible to prevent the worker from auto-delete the message on end. This is useful if you want to pop up a message multiple times. To implement this, please check the config
options.customExceedCheck
String ) The message id. This is useful if you want to delete a message manually.
ready
Fired until the worker is connected to rsmq/redis and has been initialized with the given queuename.
data
The raw event when a message has been received.
Arguments
String ) The raw rsmq message. ( See section Raw message format )
deleted
Fired after a message has been deleted.
Arguments
String ) The rsmq message id
exceeded
Fired after a message has been exceeded and immediately will be deleted.
Arguments
String ) The raw rsmq message. ( See section Raw message format )
timeout
Fired if a message processing exceeds the configured timeout.
Arguments
String ) The raw rsmq message. ( See section Raw message format )
error
Fired if a message processing throws an error.
Arguments
Error|Any ) The thrown error
String ) The raw rsmq message. ( See section Raw message format )
This is an advanced example showing some features in action.
var fs = require( "fs" );
var RSMQWorker = require( "rsmq-worker" );
var fnCheck = function( msg ){
// check function to not exceed the message if the content is `createmessages`
if( msg.message === "createmessages" ){
return true
}
return false
}
var worker = new RSMQWorker( "myqueue", {
interval: [ .1, 1 ], // wait 100ms between every receive and step up to 1,3 on empty receives
invisibletime: 2, // hide received message for 5 sec
maxReceiveCount: 2, // only receive a message 2 times until delete
autostart: true, // start worker on init
customExceedCheck: fnCheck // set the custom exceed check
});
// Listen to errors
worker.on('error', function( err, msg ){
console.log( "ERROR", err, msg.id );
});
worker.on('timeout', function( msg ){
console.log( "TIMEOUT", msg.id, msg.rc );
});
// handle exceeded messages
// grab the internal rsmq instance
var rsmq = worker._getRsmq();
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
// NOTE: make sure this queue exists
rsmq.sendMessage( "YOUR_EXCEEDED_QUEUE", msq, function( err, resp ){
if( err ){
console.error( "write-to-exceeded-queue", err )
}
});
});
// listen to messages
worker.on( "message", function( message, next, id ){
console.log( "message", message );
if( message === "createmessages" ){
next( false )
worker.send( JSON.stringify( { type: "writefile", filename: "./test.txt", txt: "Foo Bar" } ) );
worker.send( JSON.stringify( { type: "deletefile", filename: "./test.txt" } ) );
return
}
var _data = JSON.parse( message )
switch( _data.type ){
case "writefile":
fs.writeFile( _data.filename, _data.txt, function( err ){
if( err ){
next( err );
}else{
next()
}
});
break;
case "deletefile":
fs.unlink( _data.filename, function( err ){
if( err ){
next( err );
}else{
next()
}
});
break;
}
});
worker.send( "createmessages" );
interval
The option
interval can:
A.)
be a Number so the worker will poll the queue every
n seconds (e.g.
interval: .5 = twice a second second)
B.)
be an Array of Numbers. On start
interval[0] is the time to poll the queue.
Everytime the worker receives an empty response (queue is empty) the next
interval will be used to wait for the next poll (
interval[+1]) until the last definition
interval[ n ] was reached. On every received message the wait time will be reset to
interval[0].
E.g:
interval: [ .2, 1, 3 ]
- 1st poll -> no message -> wait
.2s=
200ms
- 2nd poll -> no message -> wait
1s
- 3rd poll -> no message -> wait
3s
- 4th poll -> no message -> wait
3s
- 5th poll -> 1 message -> wait
.2s
- 6th poll -> no message -> wait
1s
- 7th poll -> 1 message -> wait
.2s
- 8th poll -> 1 message -> wait
.2s
- 9th poll -> no message -> wait
.2s
- 10th poll -> no message -> wait
1s...
|Version
|Date
|Description
|0.5.2
|2016-10-24
|Optimized README and updated dependencies
|0.5.1
|2016-08-22
|Fixed reconnect error Issue#20. Thanks to mstduff; updated deps; removed generated code docs from repo
|0.5.0
|2016-07-14
|Added methods
.info(cb) (Issue#17) and
.size( [hidden,] cb )
|0.4.3
|2016-06-20
|Optimized event listeners Issue#15. Thanks to Kevin Turner
|0.4.2
|2016-05-06
|Added the
.quit() function Issue#11. Thanks to Sam Fung
|0.4.1
|2016-04-05
|Fixed missing isNumber function
|0.4.0
|2016-03-30
|Updated dependencies (especially lodash to 4.x). Fixed a config bug caused by the array merge from
extend Issue#7. Thanks to Peter Hanneman
|0.3.8
|2015-11-04
|Fixed stop behavior. Pull#5. Thanks to Exinferis
|0.3.7
|2015-09-02
|Added tests to check the behavior during errors within message processing; Added option
alwaysLogErrors to prevent console logs if an error event handler was attached. Issue #3
|0.3.6
|2015-09-02
|Updated dependencies; optimized readme (thanks to Tobias Lidskog for the pull #4)
|0.3.5
|2015-04-27
|again ... fixed argument dispatch for
.send()
|0.3.4
|2015-04-27
|fixed argument dispatch for
.send() and added optional cb for
.del()
|0.3.3
|2015-03-27
|added
changeInterval to modify the interval in operation
|0.3.2
|2015-02-23
|changed default prefix/namespace;
|0.3.0
|2015-02-16
|It's now possible to return an error as first argument of
next. This will lead to an error emit + optimized readme
|0.2.2
|2015-01-27
|added option
defaultDelay and optimized arguments of the
send method; fixed travis.yml
|0.2.0
|2015-01-27
|Added timeout, better error handling and send callback
|0.1.2
|2015-01-20
|Reorganized code, added code docs and optimized readme
|0.1.1
|2015-01-17
|Added test scripts and optimized repository file list
|0.1.0
|2015-01-16
|First working and documented version
|0.0.1
|2015-01-14
|Initial commit
