Transform and writable streams capable of processing chunks concurrently.

Usage

transform

A concurrent transform stream

Parameters

work function a function to process a single chunk. Function signature should be process(chunk, enc, callback) . When finished processing, fire the provided callback .

a function to process a single chunk. Function signature should be . When finished processing, fire the provided . options object options to pass to the transform stream. (optional, default undefined ) options.concurrency number number of chunks to process concurrently. (optional, default 1 )

options to pass to the transform stream. (optional, default )

Examples

var parallel = require ( 'parallel-stream' ); var transform = parallel.transform( function ( chunk, enc, callback ) { processAsync(chunk) .on( 'done' , function ( processedData ) { callback( null , processedData); }); }, { objectMode : true , concurrency : 15 }); readable.pipe(transform) .on( 'data' , function ( data ) { console .log( 'got processed data: %j' , data); }) .on( 'end' , function ( ) { console .log( 'complete!' ); });

Returns object a transform stream. Do not override the ._transform function.

writable

A concurrent writable stream

Parameters

work function a function to process a single chunk. Function signature should be process(chunk, enc, callback) . When finished processing, fire the provided callback .

a function to process a single chunk. Function signature should be . When finished processing, fire the provided . flush function a function to run once all chunks have been processed, but before the stream emits a finished event. Function signature should be flush(callback) , fire the provided callback when complete. (optional, default undefined )

a function to run once all chunks have been processed, but before the stream emits a event. Function signature should be , fire the provided when complete. (optional, default ) options object options to pass to the writable stream. (optional, default undefined ) options.concurrency number number of chunks to process concurrently. (optional, default 1 )

options to pass to the writable stream. (optional, default )

Examples

var parallel = require ( 'parallel-stream' ); var writable = parallel.writable( function ( chunk, enc, callback ) { processAsync(chunk) .on( 'done' , callback); }, { objectMode : true , concurrency : 15 }); readable.pipe(writable) .on( 'finish' , function ( ) { console .log( 'complete!' ); });