Node Each is a single elegant function to iterate asynchronously over elements
both in
sequential,
parallel and
concurrent mode. It is a
powerful and mature library.
Main functionalities include:
each( [{id: 1}, {id: 2}, {id: 3}] )
.parallel(2)
.call( function(element, index, callback){
console.log('element: ', element, '@', index);
setTimeout(callback, 500);
})
.next( function(err){
console.log(err ? err.message : 'Done');
});
each( [{id: 1}, {id: 2}, {id: 3}] )
.sync()
.call( function(element, index){
console.log('element: ', element, '@', index);
})
.next( function(err){
console.log(err ? err.message : 'Done');
});
Via git (or downloaded tarball):
git clone http://github.com/wdavidw/node-each.git
Then, simply copy or link the project inside a discoverable Node directory (eg './node_modules').
Via npm:
npm install each
Note:
The
each function signature is:
each(subject, [options]).
subject (array|object, required)
options (object, optional)
concurrency
may be "false" for sequential, "true" for parallel and a number for concurrent
mode. For other options, see below their associated function.
The return object is an instance of
EventEmitter.
The following functions are available:
call(function)
close()
end()
error(function)
item event will be called other than the ones already
provisionned. The callback function is called with one argument, the error
object. See the section
dealing with errors for more information.
queue
close to exit.
next(function)
parallel(mode)
sequential,
parallel or
concurrent mode. See below
for more details about the different modes.
promise
push(item) or
push(key, value)
Add array elements or key/value pairs at the end of iteration.
repeat()
times.
sync()
times()
repeat.
unshift(items)
write(items)
push.
The following properties are available:
paused
readable
started
done
total
sequential
false or set to
1, default if no parallel mode is defined.
Callbacks are chained meaning each callback is called once the previous
callback is completed (after calling the
next function argument).
parallel
true. In asynchronous mode, the handler function is called at
the same time for all elements and run in parallel
concurrent
The last argument,
callback, is a function to call once your action has
complete. It may be called with an error instance to trigger the
error event.
An example worth a tousand words, see the code examples below for usage.
Inside array iteration, callback signature is
function([value], [index], callback).
each( [] )
// 1 argument
.call( (callback) => {} )
// 2 arguments
.call( (value, callback) => {} )
// 3 arguments
.call( (value, index, callback) => {} )
// done
.then( () => {} )
Inside object iteration, callback signature is
function([key], [value], [counter], callback).
each( {} )
// 1 argument
.call( (callback) => {} )
// 2 arguments
.call( (value, callback) => {} )
// 3 arguments
.call( (key, value, callback) => {} )
// 4 arguments
.call( (key, value, counter, callback) => {} )
// done
.then( () => {} )
Error are provided by calling the
callback function argument in the
item event with an error
object as its first argument.
each( ['a', 'b'] )
.call( (element, next) =>
setImmediate( () =>
next(new Error("Catchme"))
)
)
.next( (err) =>
assert.equal(err.message, "Catchme")
)
It is also possible to throw an Error as long as the error is attach to the function:
each( ['a', 'b'] )
.call( (element, next) =>
throw new Error("Catchme")
// Not ok:
// setImmediate( () => {
// throw new Error("Catchme")
// })
)
.next( (err) => {
assert.equal(err.message, "Catchme")
})
The error will be provided to the
next function handler unless an
error function handler is defined before.
each( ['a', 'b'] )
.call( (element, next) => {
setImmediate( () => {
next(new Error("Catchme"))
})
})
.error( (err) => {
assert.equal(err.message, "Catchme")
})
.next( (err) => {
assert.equal(err, undefined)
})
In case of parallel and concurrent mode, the currently running callbacks are not canceled but no new element will be processed.
The
error argument is always an instance of error. However, it defers
according to the execution mode. In
sequential mode, it is always the error
that was thrown inside the failed callback. In
parallel and
concurrent
modes, there may be more than one event thrown asynchronously. In such case, the
error has the generic message such as
Multiple errors $count and the property
.errors is an array giving access to each individual error.
each( ['a', 'b'] )
.parralel(true)
.call( (element, next) => {
setImmediate( () => {
next(new Error(`Error ${element}`))
})
})
.error( (err) => {
assert.equal(err.message, `Multiple errors 2`)
const messages = err.errors.map( e => e.message )
assert.equal(messages, ["Catchme a", "Catchme b"])
})
Note, it is possible to know the number of successful handler functions in the
next event by subtracting the number of executed callbacks provided as the second argument to the number of errors.
each([1, 2, 3])
.parallel(true)
.call( (val, callback) => {
setImmediate( () => {
callback( val % 2 && new Error("Invalid") )
})
})
.next( (err, count) => {
const succeed = count - err.errors.length
assert.equal(succeed, 1)
})
In
sequential mode:
const each = require('each');
each( [{id: 1}, {id: 2}, {id: 3}] )
.call( (element, index, callback) => {
setImmediate(callback)
})
.next( (err) =>
console.log(err ? err.message : 'success')
)
In
parallel mode:
const each = require('each')
each( [{id: 1}, {id: 2}, {id: 3}] )
.parallel( true )
.call( (element, index, callback) => {
console.log('element: ', element, '@', index)
setTimeout(callback, 500)
})
.next( (err)
console.log(err ? err.message : 'success')
)
In
concurrent mode (4 parallel executions):
const each = require('each')
each( [{id: 1}, {id: 2}, {id: 3}] )
.parallel( 4 )
.call( (element, index, callback) => {
console.log('element: ', element, '@', index)
setTimeout(callback, 500)
})
.next( (err) =>
console.log(err ? err.message : 'success')
)
In
sequential mode:
const each = require('each')
each( {id_1: 1, id_2: 2, id_3: 3} )
.call( (key, value, callback) => {
console.log('key: ', key)
console.log('value: ', value)
setTimeout(callback, 500)
})
.next( (err) =>
console.log(err ? err.message : 'success')
)
In
concurrent mode with 2 parallels executions
const each = require('each')
each( {id_1: 1, id_2: 2, id_3: 3} )
.parallel( 2 )
.call( (key, value, callback) => {
console.log('key: ', key)
console.log('value: ', value)
setTimeout(callback, 500)
})
.next( (err) =>
console.log(err ? err.message : 'success')
)
Use
pause and
resume functions to throttle the iteration.
times and
repeat
With the addition of the
times and
repeat functions, you may traverse an
array or call a function multiple times. Note, you can not use those two
functions at the same time.
We first implemented this functionality while doing performance assessment and
needing to repeat a same set of metrics multiple times. The following sample
will call 3 times the function
doSomeMetrics with the same arguments.
each(['a', 'b', 'c', 'd'])
.times(3)
.call( (id, callback) => {
setImmediate( () => {
process.stdout.write(id)
callback()
})
})
.next( () =>
console.log('done')
)
The generated sequence is 'aaabbbcccddd'. In the same way, you could replace
times by
repeat and in such case, the generated sequence would have been
abcdabcdabcd.
It is also possible to use
times and
repeat without providing any data. Here's how:
const count = 0
each()
.times(3)
.call( callback =>
console.log(count++)
)
.next( () =>
console.log('total:' + count)
)
The
each package can be leverage to be used as a queue. In queue mode, elements may be added before and after its initialisation until the queue is close. Call the
queue function to enter the queue mode and the
close function to finish it.
const each = require('each')
const queue = each().parallel(10).queue()
queue.push('hello')
setTimeout(() => {
queue.push('world')
queue.close()
})
An error will be throw with the message "Multiple call detected" if the
callback argument in the
item callback is called multiple times. However, if
end event has already been thrown, the only way to catch the error is by registering to the "uncaughtException" event of
process.
Node Each comes with a few example, all present in the "samples" folder. Here's how you may run each of them :
node samples/array_concurrent.js
node samples/array_parallel.js
node samples/array_sequential.js
node samples/object_concurrent.js
node samples/object_sequential.js
node samples/readable_stream.js
Tests are executed with mocha. To install it, simple run
npm install, it will install
mocha and its dependencies in your project "node_modules" directory.
make test