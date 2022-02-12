Parallel transform and some utilities for Node Object Stream lovers

Why

Parallel transform accepting async function

Fixes mafintosh/parallel-transform/issues/4 to work with import { pipeline } from 'stream'

TypeScript Definition with the pure TypeScript reimplementation

Add tests

Utility functions

Blog post

Install

npm install pipeline-pipe

Example usage:

const { pipeline, Readable } = require ( 'stream' ); const pipe = require ( 'pipeline-pipe' ); pipeline( Readable.from([ 1 , 2 , 3 ]), pipe( async postId => { const json = await getPost(postId); return json; }, 16 ), pipe( json => parseHTML(json.postBody).document.title), pipe( title => title.includes( 'important' ) ? title : null ), pipe( async title => { const result = await storeInDB(title), 4 ); console .info(result); }, 4 ) (err) => console .info( 'All done!' ) );

Types:

import { Transform, TransformOptions } from 'stream' ; type ParallelTransformOpitons = | number | TransformOptions & { maxParallel?: number , ordered?: boolean }; export default function pipe ( fn: (data: any ) => Promise < any > | any , opts?: ParallelTransformOptions, ): Transform ;

Option property Default value description maxParallel 10 Number of maximum parallel executions. ordered true Preserving order of streaming chunks.

A number can be passed to opts . pipe(fn, 20) is same as pipe(fn, {maxParallel: 20}) .

Some utility functions

A promisified version of require('stream').pipeline . Equivalent to:

const { promisify } = require ( 'util' ); const { pipeline : _pipeline } = require ( 'stream' ); const pipeline = promisify(_pipeline);

Example:

const { pipeline, pipe } = require ( 'pipeline-pipe' ); await pipeline( readable, pipe( chunk => chunk.replace( 'a' , 'z' )), pipe( chunk => storeInDB(chunk)), ); console .log( 'All done!' );

It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.

Example:

const { pipeline } = require ( 'stream' ); const { concat, pipe } = require ( 'pipeline-pipe' ); pipeline( Readable.from([ 1 , 2 , 3 , 4 , 5 ]), concat( 2 ), pipe( console .log), (err) => console .info( 'All done!' ), );

Creates a Transform to split incoming Array chunk into pieces to subsequent streams.

const { pipeline } = require ( 'stream' ); const { split, pipe } = require ( 'pipeline-pipe' ); pipeline( Readable.from([ 1 , 2 , 3 ]), pipe( page => getPostsByPage(page)), pipe( json => json.posts), split(), pipe( post => storeInDB(post.title)), (err) => console .info( 'All done!' ) );

License

MIT