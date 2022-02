Create an AsyncIterableIterator from anything (on any modern platform) while handling back-pressure!

yarn install -E @n1ru4l/push-pull-async-iterable-iterator

Standalone Usage

import { makePushPullAsyncIterableIterator } from "@n1ru4l/push-pull-async-iterable-iterator" ; const { pushValue, asyncIterableIterator } = makePushPullAsyncIterableIterator(); pushValue( 1 ); pushValue( 2 ); pushValue( 3 ); for await ( const value of asyncIterableIterator) { console .log(value); }

Check if something is an AsyncIterable

import { isAsyncIterable } from "@n1ru4l/push-pull-async-iterable-iterator" ; if (isAsyncIterable(something)) { for await ( const value of something) { console .log(value); } }

Note: On Safari iOS Symbol.asyncIterator is not available, therefore all async iterators used must be build using AsyncGenerators. If a AsyncIterable that is NO AsyncGenerator is passed to isAsyncIterable on the Safari iOS environment, it will return the value false .

Wrap a Sink

import { makeAsyncIterableIteratorFromSink } from "@n1ru4l/push-pull-async-iterable-iterator" ; import { createClient } from "graphql-ws/lib/use/ws" ; const client = createClient({ url: "ws://localhost:3000/graphql" }); const asyncIterableIterator = makeAsyncIterableIteratorFromSink( sink => { const dispose = client.subscribe( { query: "{ hello }" }, { next: sink.next, error: sink.error, complete: sink.complete } ); return () => dispose(); }); for await ( const value of asyncIterableIterator) { console .log(value); }

Apply an AsyncIterableIterator to a sink

import Observable from "zen-observable"; import { makePushPullAsyncIterableIterator, applyAsyncIterableIteratorToSink } from "@n1ru4l/push-pull-async-iterable-iterator"; const { asyncIterableIterator } = makePushPullAsyncIterableIterator(); const observable = new Observable(sink => { const dispose = applyAsyncIterableIteratorToSink(asyncIterableIterator, sink); // dispose will be called when the observable subscription got destroyed // the dispose call will ensure that the async iterator is completed. return () => dispose(); }); const subscription = observable.subscribe({ next: console.log, complete: () => console.log("done."), error: () => console.log("error.") }); const interval = setInterval(() => { iterator.push("hi"); }, 1000); setTimeout(() => { subscription.unsubscribe(); clearInterval(interval); }, 5000);

Put it all together

import { Observable, RequestParameters, Variables } from "relay-runtime"; import { createClient } from "graphql-ws/lib/use/ws"; import { makeAsyncIterableFromSink, applyAsyncIterableIteratorToSink } from "@n1ru4l/push-pull-async-iterable-iterator"; import { createApplyLiveQueryPatch } from "@n1ru4l/graphql-live-query-patch"; const client = createClient({ url: "ws://localhost:3000/graphql" }); export const execute = (request: RequestParameters, variables: Variables) => { if (!request.text) { throw new Error("Missing document."); } const query = request.text; return Observable.create<GraphQLResponse>(sink => { // Create our asyncIterator from a Sink const executionResultIterator = makeAsyncIterableFromSink(wsSink => { const dispose = client.subscribe({ query }, wsSink); return () => dispose(); }); const applyLiveQueryPatch = createApplyLiveQueryPatch(); // apply some middleware to our asyncIterator const compositeIterator = applyLiveQueryPatch(executionResultIterator); // Apply our async iterable to the relay sink // unfortunately relay cannot consume an async iterable right now. const dispose = applyAsyncIterableIteratorToSink(compositeIterator, sink); // dispose will be called by relay when the observable is disposed // the dispose call will ensure that the async iterator is completed. return () => dispose(); }); };

Operators

This package also ships a few utilities that make your life easier!

map

Map a source

import { map } from "@n1ru4l/push-pull-async-iterable-iterator" ; async function * source ( ) { yield 1 ; yield 2 ; yield 3 ; } const square = map((value: number ): number => value * value); for await ( const value of square(source())) { console .log(value); }

filter

Filter a source

import { filter } from "@n1ru4l/push-pull-async-iterable-iterator" ; async function * source ( ) { yield 1 ; yield 2 ; yield 3 ; } const biggerThan1 = filter((value: number ): number => value > 1 ); for await ( const value of biggerThan1(source())) { console .log(value); }

Other helpers

withHandlers

Attach a return and throw handler to a source.