Distributed timed job queue, backed by redis.

Features

Support redis cluster.

Support one or more timed-queue instance in a redis instance. Each timed-queue instance are segregated by prefix option

Support one or more timed-queue clients for a timed-queue instance.

Demo

const TimedQueue = require ( 'timed-queue' ) const timedQueue = new TimedQueue({ prefix : 'TQ1' , interval : 1000 * 60 }) timedQueue.connect([ 7000 , 7001 , 7002 ]) .on( 'error' , function ( error ) { console .error(error) }) const eventQueue = timedQueue.queue( 'event' ) eventQueue.on( 'job' , function ( jobObj ) { eventQueue.ackjob(jobObj.job)() }) eventQueue.addjob(eventObj.id, new Date (eventObj.startDate).getTime() - 10 * 60 * 1000 )( function ( err, res ) { console .log(err, res) })

Installation

npm install timed-queue

Job

Job Class:

function Job ( queue, job, timing, active, retryCount ) { this .queue = queue this .job = job this .timing = timing this .active = active this .retryCount = retryCount }

this.queue : {String} Queue name

this.job : {String} The job's name

this.timing : {Number} The time in millisecond when the job should be actived

this.active : {Number} The actual time in millisecond that the job be actived

this.retryCount : {Number} A job that has been actived but has not been ACK in retry time will be actived again. retryCount is times that the job re-actived.

API

const TimedQueue = require ( 'timed-queue' )

new TimedQueue([options]) => timedQueue object

Return a timedQueue client. It is an EventEmitter instance.

options.prefix : {String} Redis key's prefix, or namespace. Default to "TIMEDQ"

options.count : {Number} The maximum job count for queue's getjobs method. Default to 64

options.interval : {Number} Interval time for scanning. Default to 1000 * 60 ms

options.retry : {Number} Retry time for a job. A job that has been actived but has not been ACK in retry time will be actived again. Default to interval / 2 ms

options.expire : {Number} Expiration time for a job. A job that has been actived and has not been ACK in expire time will be removed from the queue. Default to interval * 5 ms

options.accuracy : {Number} Scanning accuracy. Default to interval / 5

: {Number} Scanning accuracy. Default to options.autoScan : {Boolean} The flag to enable or disable automatic scan. Default to true . It can be set to false if automatic scan is not desired.

const timedQueue = new TimedQueue()

TimedQueue Events

timedQueue.on('connect', function () {})

timedQueue.on('error', function (error) {})

timedQueue.on('close', function () {})

timedQueue.on('scanStart', function (queuesLength) {})

timedQueue.on('scanEnd', function (queuesLength, timeConsuming) {})

TimedQueue.prototype.connect([host, options]) => this

TimedQueue.prototype.connect(redisClient) => this

Connect to redis. Arguments are the same as thunk-redis's createClient , or give a thunk-redis instance.

timedQueue.connect()

TimedQueue.prototype.scan() => this

Start scanning. It automatically starts after connect method is called unless autoScan is set to false .

TimedQueue.prototype.stop() => this

Stop scanning.

TimedQueue.prototype.close() => this

Close the timedQueue . It closes redis client of the timedQueue accordingly.

TimedQueue.prototype.regulateFreq(factor) => this

It is used to regulate the automatic scanning frequency.

TimedQueue.prototype.destroyQueue(queue[, options]) => this

Remove the queue. It deletes all data in the queue from redis.

TimedQueue.prototype.queue(queue[, options]) => Queue instance

Return a Queue instance if one exists. Otherwise it creates a Queue instance and return it. Queue instance is a EventEmitter instance.

queue : {String} The queue's name

options.count : {Number} The maximum job count for queue's getjobs method. Default to timedQueue's count

options.retry : {Number} Retry time for a job. A job that has been actived and has not been ACK in retry time will be actived again. Default to timedQueue's retry

options.expire : {Number} Expiration time for job. A job that has been actived and has not been ACK in expire time will be removed from the queue. Default to timedQueue's expire

options.accuracy : {Number} Scanning accuracy, Default to timedQueue's accuracy

const eventQueue = timedQueue.queue( 'event' , { retry : 1000 , expire : 5000 })

Queue Events

queue.on('job', function (job) {})

If no job listener on queue, queue scanning will not run.

Queue.prototype.init([options]) => this

options.count : {Number} The maximum job count for queue's getjobs method. Default to timedQueue's count

options.retry : {Number} Retry time for a job. A job that has been actived and has not been ACK in retry time will be actived again. Default to timedQueue's retry

options.expire : {Number} Expire time for a job. A job that has been actived and has not been ACK in expire time will be removed from queue. Default to timedQueue's expire

options.accuracy : {Number} Scanning accuracy. Default to timedQueue's accuracy

Queue.prototype.addjob(job, timing[, job, timing, ...]) => thunk function

Queue.prototype.addjob([job, timing, job, timing, ...]) => thunk function

Add one or more jobs to the queue. It can be used to update the job's timing.

job : {String} The job's name

timing : {Number} The time in millisecond when the job should be actived. It should greater than Date.now()

eventQueue.addjob( '52b3b5f49c2238313600015d' , 1441552050409 )( function ( err, res ) { console .log(err, res) })

Queue.prototype.show(job) => thunk function

Show the job info.

job : {String} job

eventQueue.show( '52b3b5f49c2238313600015d' )( function ( err, res ) { console .log(err, res) })

Queue.prototype.deljob(job[, job, ...]) => thunk function

Queue.prototype.deljob([job, job, ...]) => thunk function

Delete one or more jobs.

job : {String} job

eventQueue.deljob( '52b3b5f49c2238313600015d' )( function ( err, res ) { console .log(err, res) })

Queue.prototype.getjobs([scanActive]) => thunk function

It is called by Queue.prototype.scan . It should not be called explicitly unless you know what you are doing.

Queue.prototype.ackjob(job[, job, ...]) => thunk function

Queue.prototype.ackjob([job, job, ...]) => thunk function

ACK one or more jobs.

job : {String} job

eventQueue.ackjob( '52b3b5f49c2238313600015d' )( function ( err, res ) { console .log(err, res) })

Queue.prototype.scan() => thunk function

It is called by TimedQueue.prototype.scan . It should not be called explicitly unless you know what you are doing.

Queue.prototype.len() => thunk function

Return the queue' length.

eventQueue.len()( function ( err, res ) { console .log(err, res) })

Queue.prototype.showActive() => thunk function

Return actived jobs in the queue.