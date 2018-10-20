Distributed timer backed by Redis.
In a clustered server environment, you'd occasionally need to process a task after a certain period of time. The setTimeout() may not be suitable because the process may die for whatever reason, the timed events would also be lost. Ideally, you'd want to store these timed events on a central storage, then have a cluster of listeners handle the due events. If you are already using Redis, then this dtimer would be a handy solution for you.
$ npm install dtimer
var Promise = require('bluebird');
var redis = Promise.promisifyAll(require('redis'));
maxEvents
The redis module MUST be promisified before instantiating clients for
puband
sub. See the example below.
join([cb]) => {Promise}
* cb {function} Optional callback.
* returns Promise if cb is not supplied.
leave([cb]) => {Promise}
* cb {function} Optional callback.
* returns Promise if cb is not supplied.
post(ev, delay [, cb]) => {Promise}
* {object} ev - Event data.
* {string} [ev.id] - User provided event ID. If not present, dtimer will automatically
assign an ID using uuid.
* {number} [ev.maxRetries] - Maximum number of retries that occur if confirm() is not
made within confTimeout [sec]. If not present, it defaults to 0 (no retry).
* {number} delay - Delay value in milliseconds.
* {function} [cb] - Callback made when the post operation is complete.
* returns Promise if cb is not supplied.
* Resolved value: evId {string} - Event ID assigned to the posted event. If ev object
already had id property, this evId is identical to ev.id always.
The
evobject may have user-defined properties as its own properties, however, the following properties are reserved and used by dtimer; 'id', 'maxRetries' and '_numRetries'. If your application needs to use these names (for application specific use), then consider putting all user-defined event object inside the
evlike this:
{
id: '25723fdd-4434-4cbd-b579-4693e221ec73',
maxRetries: 3,
// _numRetries: 0 // added by dtimer before the event was fired
data: { /*...*/ } // user-defined event object
}
peek(evId [, cb]) => {Promise}
* {string} evId - The event ID to be peeked.
* {function} [cb] - Callback made when the operation is complete.
* returns Promise if cb is not supplied.
* Resolved value: results {array} An array of results.
* results[0] {number} Time to expire in milliseconds, or null if the event does
not exit.
* results[1] {object} Event object, or null if the event does not exit.
cancel(evId [, cb]) => {Promise}
* {string} evId - The event ID to be canceled.
* {function} [cb] - Callback made when the operation is complete.
* returns Promise if cb is not supplied.
* Resolved value {number} 0: the event ID not found. 1: the event has been canceled.
confirm(evId [, cb])
* {string} evId - The event ID to be confirmed.
* {function} [cb] - Callback made when the operation is complete.
* returns Promise if cb is not supplied.
* Resolved value {number} 0: the event ID not found. 1: the event has been confirmed.
changeDelay(evId, delay, [, cb]) => {Promise}
* {string} evId - The event ID for which the delay will be changed.
* {number} delay - New delay (in milliseconds relative to the current time).
* {function} [cb] - Callback made when the operation is complete.
* returns Promise if cb is not supplied.
* Resolved value {number} 0: the event ID not found. 1: the delay has been updated.
This method is provided for diagnostic purpose only and the use of this method in production is highly discouraged unless the number of events retrieved is reasonably small. Cost of this operation is O(N), where N is the number events that would be retrieved.
upcoming([option] [, cb]) => {Promise}
* {object} option - Options
* {number} offset Offset expiration time in msec from which events are retrieved.
This defaults to the current (redis-server) time (-1).
* {number} duration Time length [msec] from offset time for which events are
retrieved. This defaults to '+inf' (-1).
* {number} limit Maximum number of events to be retrieved. This defaults to
`no limit` (-1).
* {function} [cb] - Callback made when upcoming operation is complete.
The callback function takes following args:
* {Error} err - Error object. Null is set on success.
* {object} events - List of objects that met the given criteria.
* returns Promise if cb is not supplied.
{
"25723fdd-4434-4cbd-b579-4693e221ec73": {
"expireAt": 1410502530320,
"event": {
"msg": "hello"
}
},
"24bbef35-8014-4107-803c-5ff4b858a5ad": {
"expireAt": 1410502531321,
"event": {
"msg": "hello"
}
}
}
The handler will be called with the following argument:
* ev {object} Event object.
* ev.id {string} Event ID.
* ev.maxRetries {number} Max retries specified when this event was posted.
* ev._numRetries {number} Number of retries made before this event occured.
The handler will be called with the following argument:
* err {Error} Error object.
var DTimer = require('dtimer').DTimer;
var Promise = require('bluebird');
var redis = Promise.promisifyAll(require('redis')); // module level promisification
var pub = redis.createClient();
var sub = redis.createClient();
var dt = new DTimer('ch1', pub, sub)
dt.on('event', function (ev) {
// do something with ev
// If the posted event has `maxRetries` property set to a number greater than 0,
// you must call confirm() with its event ID. If not confirmed, the event will
// fire in `confTimeout` (default 10 sec) again, until the number of retries
// becomes `maxRetries`.
dt.confirm(ev.id, function (err) {
// confirmed.
});
})
dt.on('error', function (err) {
// handle error
})
dt.join(function (err) {
if (err) {
// join failed
return;
}
// join successfully
})
dt.post({id: 'myId', maxRetries: 3, msg:'hello'}, 200, function (err) {
if (err) {
// failed to post event
return;
}
// posted the event successfully
// If you need to cancel this event, then do:
//dt.cancel('myId', function (err) {...});
})
join(), you are declaring yourself as a listener to consume due events.