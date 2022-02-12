Parallel transform and some utilities for Node Object Stream lovers
import { pipeline } from 'stream'
npm install pipeline-pipe
Example usage:
// Example to scrape HTML and store titles of them in DB:
const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3]),
// Request HTML asynchronously in 16 parallel
pipe(async postId => {
const json = await getPost(postId);
return json;
}, 16),
// Synchronous transformation as Array.prototype.map
pipe(json => parseHTML(json.postBody).document.title),
// Synchronous transformation as Array.prototype.filter
pipe(title => title.includes('important') ? title : null),
// Asynchronous in 4 parallel
pipe(async title => {
const result = await storeInDB(title), 4);
console.info(result);
}, 4)
(err) => console.info('All done!')
);
Types:
import { Transform, TransformOptions } from 'stream';
type ParallelTransformOpitons =
| number
| TransformOptions & { maxParallel?: number, ordered?: boolean };
export default function pipe(
fn: (data: any) => Promise<any> | any,
opts?: ParallelTransformOptions,
): Transform;
|Option property
|Default value
|description
maxParallel
10
|Number of maximum parallel executions.
ordered
true
|Preserving order of streaming chunks.
A number can be passed to
opts.
pipe(fn, 20) is same as
pipe(fn, {maxParallel: 20}).
A promisified version of
require('stream').pipeline. Equivalent to:
const { promisify } = require('util');
const { pipeline: _pipeline } = require('stream');
const pipeline = promisify(_pipeline);
Example:
const { pipeline, pipe } = require('pipeline-pipe');
await pipeline(
readable,
pipe(chunk => chunk.replace('a', 'z')),
pipe(chunk => storeInDB(chunk)),
);
console.log('All done!');
It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.
Example:
const { pipeline } = require('stream');
const { concat, pipe } = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3, 4, 5]),
concat(2),
pipe(console.log), // [ 1, 2 ]
// [ 3, 4 ]
// [ 5 ]
(err) => console.info('All done!'),
);
Creates a
Transform to split incoming
Array chunk into pieces to subsequent streams.
const { pipeline } = require('stream');
const { split, pipe } = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3]),
pipe(page => getPostsByPage(page)),
pipe(json => json.posts), // Returns an array of posts
split(), // Splits the array into each posts
pipe(post => storeInDB(post.title)), // Now the argument is a post
(err) => console.info('All done!')
);
MIT