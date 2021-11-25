GraphQL subscriptions is a simple npm package that lets you wire up GraphQL with a pubsub system (like Redis) to implement subscriptions in GraphQL.
You can use it with any GraphQL client and server (not only Apollo).
npm install graphql-subscriptions graphql or
yarn add graphql-subscriptions graphql
This package should be used with a network transport, for example subscriptions-transport-ws.
If you are developing a project that uses this module with TypeScript:
tsconfig.json
lib definition includes
"esnext.asynciterable"
npm install @types/graphql or
yarn add @types/graphql
To begin with GraphQL subscriptions, start by defining a GraphQL
Subscription type in your schema:
type Subscription {
somethingChanged: Result
}
type Result {
id: String
}
Next, add the
Subscription type to your
schema definition:
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
Now, let's create a simple
PubSub instance - it is a simple pubsub implementation, based on
EventEmitter. Alternative
EventEmitter implementations can be passed by an options object
to the
PubSub constructor.
import { PubSub } from 'graphql-subscriptions';
export const pubsub = new PubSub();
Now, implement your Subscriptions type resolver, using the
pubsub.asyncIterator to map the event you need:
const SOMETHING_CHANGED_TOPIC = 'something_changed';
export const resolvers = {
Subscription: {
somethingChanged: {
subscribe: () => pubsub.asyncIterator(SOMETHING_CHANGED_TOPIC),
},
},
}
Subscriptions resolvers are not a function, but an object with
subscribemethod, that returns
AsyncIterable.
Now, the GraphQL engine knows that
somethingChanged is a subscription, and every time we use
pubsub.publish over this topic - it will publish it using the transport we use:
pubsub.publish(SOMETHING_CHANGED_TOPIC, { somethingChanged: { id: "123" }});
Note that the default PubSub implementation is intended for demo purposes. It only works if you have a single instance of your server and doesn't scale beyond a couple of connections. For production usage you'll want to use one of the PubSub implementations backed by an external store. (e.g. Redis)
When publishing data to subscribers, we need to make sure that each subscriber gets only the data it needs.
To do so, we can use
withFilter helper from this package, which wraps
AsyncIterator with a filter function, and lets you control each publication for each user.
withFilter API:
asyncIteratorFn: (rootValue, args, context, info) => AsyncIterator<any> : A function that returns
AsyncIterator you got from your
pubsub.asyncIterator.
filterFn: (payload, variables, context, info) => boolean | Promise<boolean> - A filter function, executed with the payload (the published value), variables, context and operation info, must return
boolean or
Promise<boolean> indicating if the payload should pass to the subscriber.
For example, if
somethingChanged would also accept a variable with the ID that is relevant, we can use the following code to filter according to it:
import { withFilter } from 'graphql-subscriptions';
const SOMETHING_CHANGED_TOPIC = 'something_changed';
export const resolvers = {
Subscription: {
somethingChanged: {
subscribe: withFilter(() => pubsub.asyncIterator(SOMETHING_CHANGED_TOPIC), (payload, variables) => {
return payload.somethingChanged.id === variables.relevantId;
}),
},
},
}
Note that when using
withFilter, you don't need to wrap your return value with a function.
You can map multiple channels into the same subscription, for example when there are multiple events that trigger the same subscription in the GraphQL engine.
const SOMETHING_UPDATED = 'something_updated';
const SOMETHING_CREATED = 'something_created';
const SOMETHING_REMOVED = 'something_removed';
export const resolvers = {
Subscription: {
somethingChanged: {
subscribe: () => pubsub.asyncIterator([ SOMETHING_UPDATED, SOMETHING_CREATED, SOMETHING_REMOVED ]),
},
},
}
You can also manipulate the published payload, by adding
resolve methods to your subscription:
const SOMETHING_UPDATED = 'something_updated';
export const resolvers = {
Subscription: {
somethingChanged: {
resolve: (payload, args, context, info) => {
// Manipulate and return the new value
return payload.somethingChanged;
},
subscribe: () => pubsub.asyncIterator(SOMETHING_UPDATED),
},
},
}
Note that
resolve methods execute after
subscribe, so if the code in
subscribe depends on a manipulated payload field, you will need to factor out the manipulation and call it from both
subscribe and
resolve.
Your database might have callback-based listeners for changes, for example something like this:
const listenToNewMessages = (callback) => {
return db.table('messages').listen(newMessage => callback(newMessage));
}
// Kick off the listener
listenToNewMessages(message => {
console.log(message);
})
The
callback function would be called every time a new message is saved in the database. Unfortunately, that doesn't play very well with async iterators out of the box because callbacks are push-based, where async iterators are pull-based.
We recommend using the
callback-to-async-iterator module to convert your callback-based listener into an async iterator:
import asyncify from 'callback-to-async-iterator';
export const resolvers = {
Subscription: {
somethingChanged: {
subscribe: () => asyncify(listenToNewMessages),
},
},
}
AsyncIterator Wrappers
The value you should return from your
subscribe resolver must be an
AsyncIterator.
You can use this value and wrap it with another
AsyncIterator to implement custom logic over your subscriptions.
For example, the following implementation manipulates the payload by adding some static fields:
import { $$asyncIterator } from 'iterall';
export const withStaticFields = (asyncIterator: AsyncIterator<any>, staticFields: Object): Function => {
return (rootValue: any, args: any, context: any, info: any): AsyncIterator<any> => {
return {
next() {
return asyncIterator.next().then(({ value, done }) => {
return {
value: {
...value,
...staticFields,
},
done,
};
});
},
return() {
return Promise.resolve({ value: undefined, done: true });
},
throw(error) {
return Promise.reject(error);
},
[$$asyncIterator]() {
return this;
},
};
};
};
You can also take a look at
withFilterfor inspiration.
For more information about
AsyncIterator:
It can be easily replaced with some other implementations of PubSubEngine abstract class. Here are a few of them:
You can also implement a
PubSub of your own, by using the exported abstract class
PubSubEngine from this package. By using
extends PubSubEngine you use the default
asyncIterator method implementation; by using
implements PubSubEngine you must implement your own
AsyncIterator.
SubscriptionManager is the previous alternative for using
graphql-js subscriptions directly, and it's now deprecated.
If you are looking for its API docs, refer to a previous commit of the repository