Simple API for using web workers with RxJS observables
fromWorker function from main thread side
materialize and
dematerialize is used as a robust transport of streaming errors
mergeMap,
switchMap or
exhaustMap in your worker if
the input stream contains multiple items that generate their own stream of results
Transferable parts
of message payloads so large binaries can transferred efficiently without copying - See Transferable
section for usage
switchMap operator, or parallelisation of computation with
mergeMap
Install the npm package:
observable-webworker
# with npm
npm install observable-webworker
# or with yarn
yarn add observable-webworker
💡 Take note! The webworker construction syntax differs for different version of webpack:
// src/readme/hello.ts
import { fromWorker } from 'observable-webworker';
import { of } from 'rxjs';
const input$ = of('Hello from main thread');
fromWorker<string, string>(
() => new Worker(new URL('./hello.worker', import.meta.url), { type: 'module' }),
input$,
).subscribe(message => {
console.log(message); // Outputs 'Hello from webworker'
});
// src/readme/hello-webpack-5.ts#L2-L12
import { fromWorker } from 'observable-webworker';
import { of } from 'rxjs';
const input$ = of('Hello from main thread');
fromWorker<string, string>(
() => new Worker(new URL('./app.worker', import.meta.url), { type: 'module' }),
input$,
).subscribe(message => {
console.log(message); // Outputs 'Hello from webworker'
});
// src/readme/hello.worker.ts
import { DoWork, runWorker } from 'observable-webworker';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
export class HelloWorker implements DoWork<string, string> {
public work(input$: Observable<string>): Observable<string> {
return input$.pipe(
map(message => {
console.log(message); // outputs 'Hello from main thread'
return `Hello from webworker`;
}),
);
}
}
runWorker(HelloWorker);
Future versions of webpack (Webpack 5) minify webworkers overly aggressively, causing
the
@ObservableWorker() decorator pattern to no longer function. This decorator
has now been deprecated, and will be removed in the next major version of this library.
To migrate from decorators, simply remove the decorator, and invoke the
runWorker
with your class passed as argument (see example above).
Make sure you don't forget to remove the decorator when you add the
runWorker(...)
function, otherwise the worker will be run twice, each acting on any message sent.
If either your input or output (or both!) streams are passing large messages to or from the worker, it is highly
recommended to use message types that implement the Transferable
interface (
ArrayBuffer,
MessagePort,
ImageBitmap).
Bear in mind that when transferring a message to a webworker that the main thread relinquishes ownership of the data.
Recommended reading:
To use
Transferables with observable-worker, a slightly more complex interface is provided for both sides of the
main/worker thread.
If the main thread is transferring
Transferables to the worker, simply add a callback to the
fromWorker function
call to select which elements of the input stream are transferable.
// src/readme/transferable.main.ts#L7-L11
return fromWorker<ArrayBuffer, string>(
() => new Worker(new URL('./transferable.worker', import.meta.url), { type: 'module' }),
input$,
input => [input],
);
If the worker is transferring
Transferables to the main thread simply implement
DoTransferableWork, which will
require you to add an additional method
selectTransferables?(output: O): Transferable[]; which you implement to select
which elements of the output object are
Transferable.
Both strategies are compatible with each other, so if for example you're computing the hash of a large
ArrayBuffer in
a worker, you would only need to use add the transferable selector callback in the main thread in order to mark the
ArrayBuffer as being transferable in the input. The library will handle the rest, and you can just use
DoWork in the
worker thread, as the return type
string is not
Transferable.
If you have a large amount of work that needs to be done, you can use the
fromWorkerPool function to automatically
manage a pool of workers to allow true concurrency of work, distributed evenly across all available cores.
The worker pool strategy has the following features
Observable,
Array, or
Iterable
navigation.hardwareConcurrency - 1 to keep the main core free.
Observable, work is considered remaining while the observable is not completed
Array, work remains while there are items in the array
Iterable, work remains while the iterator is not
result.done
mergeAll(), which means the output from the webworker(s) is output as soon as available
In this simple example, we have a function that receives an array of files and returns an observable of the MD5 sum hashes of those files. For simplicity, we're passing the primitives back and forth, however in reality you are likely to want to construct your own interface to define the messages being passed to and from the worker.
// src/readme/worker-pool.main.ts
import { Observable } from 'rxjs';
import { fromWorkerPool } from 'observable-webworker';
export function computeHashes(files: File[]): Observable<string> {
return fromWorkerPool<File, string>(
() => new Worker(new URL('./worker-pool-hash.worker', import.meta.url), { type: 'module' }),
files,
);
}
// src/readme/worker-pool-hash.worker.ts
import * as md5 from 'js-md5';
import { DoWorkUnit, runWorker } from 'observable-webworker';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
export class WorkerPoolHashWorker implements DoWorkUnit<File, string> {
public workUnit(input: File): Observable<string> {
return this.readFileAsArrayBuffer(input).pipe(map(arrayBuffer => md5(arrayBuffer)));
}
private readFileAsArrayBuffer(blob: Blob): Observable<ArrayBuffer> {
return new Observable(observer => {
if (!(blob instanceof Blob)) {
observer.error(new Error('`blob` must be an instance of File or Blob.'));
return;
}
const reader = new FileReader();
reader.onerror = err => observer.error(err);
reader.onload = () => observer.next(reader.result as ArrayBuffer);
reader.onloadend = () => observer.complete();
reader.readAsArrayBuffer(blob);
return () => reader.abort();
});
}
}
runWorker(WorkerPoolHashWorker);
Note here that the worker class
implements DoWorkUnit<File, string>. This is different to before where we implemented
DoWork which had the slightly more complex signature of inputting an observable and outputting one.
If using the
fromWorkerPool strategy, you must only implement
DoWorkUnit as it relies on the completion of the
returned observable to indicate that the unit of work is finished processing, and the next unit of work can be
transferred to the worker.
Commonly, a worker that implements
DoWorkUnit only needs to return a single value, so you may instead return a
Promise
from the
workUnit method.
// src/app/doc/async-work.worker.ts#L7-L14
export class FactorizationWorker implements DoWorkUnit<number, number[]> {
public async workUnit(input: number): Promise<number[]> {
return factorize(input);
}
}
runWorker(FactorizationWorker);
