An async stream which can be iterated over using a for-await-of loop and which can be written to.

The WritableConsumableStream class extends the ConsumableStream class.

See https://github.com/SocketCluster/consumable-stream

Installation

npm install writable-consumable-stream

Usage

Require

const WritableConsumableStream = require ( 'writable-consumable-stream' );

Consume a stream and write to it asynchronously:

let consumableStream = new WritableConsumableStream(); async function consumeAsyncIterable ( asyncIterable ) { for await ( let packet of asyncIterable) { console .log( 'Packet:' , packet); } } consumeAsyncIterable(consumableStream); setInterval( () => { consumableStream.write( `Timestamp: ${ Date .now()} ` ); }, 100 );

Consume a stream using a while loop:

let consumableStream = new WritableConsumableStream(); async function consumeAsyncIterable ( asyncIterable ) { let asyncIterator = asyncIterable.createConsumer(); while ( true ) { let packet = await asyncIterator.next(); if (packet.done) break ; console .log( 'Packet:' , packet.value); } } consumeAsyncIterable(consumableStream); setInterval( () => { consumableStream.write( `Timestamp: ${ Date .now()} ` ); }, 100 );

Consume a filtered stream using an async generator:

let consumableStream = new WritableConsumableStream(); async function * createFilteredStreamGenerator ( fullStream, filterFunction ) { for await ( let packet of fullStream) { if (filterFunction(packet)) { yield packet; } } } async function consumeAsyncIterable ( asyncIterable ) { for await ( let packet of asyncIterable) { console .log( 'Packet:' , packet); } } function filterFn ( data ) { return /5$/ .test(data); } let filteredStreamGenerator = createFilteredStreamGenerator(consumableStream, filterFn); consumeAsyncIterable(filteredStreamGenerator); setInterval( () => { consumableStream.write( `Timestamp: ${ Date .now()} ` ); }, 100 );

Consume only the next data object which will be written to the stream:

let consumableStream = new WritableConsumableStream(); ( async ( ) => { let data = await consumableStream.once(); console .log(data); })(); setInterval( () => { consumableStream.write( `Timestamp: ${ Date .now()} ` ); }, 100 );