Getting started

Install

yarn add graphql-sse

Create a GraphQL schema

import { GraphQLSchema, GraphQLObjectType, GraphQLString } from 'graphql' ; const schema = new GraphQLSchema({ query: new GraphQLObjectType({ name: 'Query' , fields: { hello: { type : GraphQLString, resolve: () => 'world' , }, }, }), subscription: new GraphQLObjectType({ name: 'Subscription' , fields: { greetings: { type : GraphQLString, subscribe: async function * ( ) { for ( const hi of [ 'Hi' , 'Bonjour' , 'Hola' , 'Ciao' , 'Zdravo' ]) { yield { greetings: hi }; } }, }, }, }), });

Start the server

import http from 'http' ; import { createHandler } from 'graphql-sse' ; const handler = createHandler({ schema, }); const server = http.createServer( ( req, res ) => { if (req.url.startsWith( '/graphql/stream' )) return handler(req, res); return res.writeHead( 404 ).end(); }); server.listen( 4000 ); console .log( 'Listening to port 4000' );

Browsers might complain about self-signed SSL/TLS certificates. Help can be found on StackOverflow.

openssl req -x509 -newkey rsa:2048 -nodes -sha256 -subj '/CN=localhost' \ -keyout localhost-privkey.pem -out localhost-cert.pem

import fs from 'fs' ; import http2 from 'http2' ; import { createHandler } from 'graphql-sse' ; const handler = createHandler({ schema, }); const server = http2.createSecureServer( { key: fs.readFileSync( 'localhost-privkey.pem' ), cert: fs.readFileSync( 'localhost-cert.pem' ), }, ( req, res ) => { if (req.url.startsWith( '/graphql/stream' )) return handler(req, res); return res.writeHead( 404 ).end(); }, ); server.listen( 4000 ); console .log( 'Listening to port 4000' );

import express from 'express' ; import { createHandler } from 'graphql-sse' ; const handler = createHandler({ schema }); const app = express(); app.use( '/graphql/stream' , handler); app.listen( 4000 ); console .log( 'Listening to port 4000' );

import Fastify from 'fastify' ; import { createHandler } from 'graphql-sse' ; const handler = createHandler({ schema }); const fastify = Fastify(); fastify.all( '/graphql/stream' , ( req, res ) => handler( req.raw, res.raw, req.body, ), ); fastify.listen( 4000 ); console .log( 'Listening to port 4000' );

Use the client

import { createClient } from 'graphql-sse' ; const client = createClient({ url: 'http://localhost:4000/graphql/stream' , }); ( async () => { const result = await new Promise ( ( resolve, reject ) => { let result; client.subscribe( { query: '{ hello }' , }, { next: ( data ) => (result = data), error: reject, complete: () => resolve(result), }, ); }); expect(result).toEqual({ hello: 'world' }); })(); ( async () => { const onNext = () => { }; let unsubscribe = () => { }; await new Promise ( ( resolve, reject ) => { unsubscribe = client.subscribe( { query: 'subscription { greetings }' , }, { next: onNext, error: reject, complete: resolve, }, ); }); expect(onNext).toBeCalledTimes( 5 ); })();

Recipes

🔗 Client usage with Promise import { createClient, SubscribePayload } from 'graphql-sse' ; const client = createClient({ url: 'http://hey.there:4000/graphql/stream' , }); async function execute < T >( payload: SubscribePayload ) { return new Promise <T> ( ( resolve, reject ) => { let result: T; client.subscribe<T>( payload, { next: ( data ) => ( result = data ), error: reject, complete: ( ) => resolve( result ), } ); } ); } // use ( async ( ) => { try { const result = await execute( { query: '{ hello }', } ); } catch ( err ) { } } ) () ;

🔗 Client usage with AsyncIterator import { createClient, SubscribePayload } from 'graphql-sse' ; const client = createClient({ url: 'http://iterators.ftw:4000/graphql/stream' , }); function subscribe < T >( payload: SubscribePayload ): AsyncGenerator < T > { let deferred: { resolve: ( done: boolean ) => void ; reject: ( err: unknown ) => void ; } | null = null ; const pending: T[] = []; let throwMe: unknown = null , done = false ; const dispose = client.subscribe<T>(payload, { next: ( data ) => { pending.push(data); deferred?.resolve( false ); }, error: ( err ) => { throwMe = err; deferred?.reject(throwMe); }, complete: () => { done = true ; deferred?.resolve( true ); }, }); return { [Symbol.asyncIterator]() { return this ; }, async next() { if (done) return { done: true , value: undefined }; if (throwMe) throw throwMe; if (pending.length) return { value: pending.shift()! }; return ( await new Promise < boolean >( ( resolve, reject ) => (deferred = { resolve, reject }), )) ? { done: true , value: undefined } : { value: pending.shift()! }; }, async throw (err) { throw err; }, async return () { dispose(); return { done: true , value: undefined }; }, }; } ( async () => { const subscription = subscribe({ query: 'subscription { greetings }' , }); for await ( const result of subscription) { } })();

🔗 Client usage with Observable import { Observable } from 'relay-runtime' ; import { Observable } from '@apollo/client/core' ; import { Observable } from 'rxjs' ; import Observable from 'zen-observable' ; const client = createClient({ url: 'http://graphql.loves:4000/observables' , }); function toObservable ( operation ) { return new Observable( ( observer ) => client.subscribe(operation, { next: ( data ) => observer.next(data), error: ( err ) => observer.error(err), complete: () => observer.complete(), }), ); } const observable = toObservable({ query: `subscription { ping }` }); const subscription = observable.subscribe({ next: ( data ) => { expect(data).toBe({ data: { ping: 'pong' } }); }, }); subscription.unsubscribe();

🔗 Client usage with Relay import { GraphQLError } from 'graphql' ; import { Network, Observable, RequestParameters, Variables, } from 'relay-runtime' ; import { createClient } from 'graphql-sse' ; const subscriptionsClient = createClient({ url: 'http://i.love:4000/graphql/stream' , headers: () => { const session = getSession(); if (!session) return {}; return { Authorization: `Bearer ${session.token} ` , }; }, }); function fetchOrSubscribe ( operation: RequestParameters, variables: Variables ) { return Observable.create( ( sink ) => { if (!operation.text) { return sink.error( new Error ( 'Operation text cannot be empty' )); } return subscriptionsClient.subscribe( { operationName: operation.name, query: operation.text, variables, }, sink, ); }); } export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);

🔗 Client usage with urql import { createClient, defaultExchanges, subscriptionExchange } from 'urql' ; import { createClient as createWSClient } from 'graphql-sse' ; const sseClient = createWSClient({ url: 'http://its.urql:4000/graphql/stream' , }); const client = createClient({ url: '/graphql/stream' , exchanges: [ ...defaultExchanges, subscriptionExchange({ forwardSubscription(operation) { return { subscribe: ( sink ) => { const dispose = sseClient.subscribe(operation, sink); return { unsubscribe: dispose, }; }, }; }, }), ], });

🔗 Client usage with Apollo import { ApolloLink, Operation, FetchResult, Observable, } from '@apollo/client/core' ; import { print, GraphQLError } from 'graphql' ; import { createClient, ClientOptions, Client } from 'graphql-sse' ; class SSELink extends ApolloLink { private client: Client; constructor ( options: ClientOptions ) { super (); this .client = createClient(options); } public request(operation: Operation): Observable<FetchResult> { return new Observable( ( sink ) => { return this .client.subscribe<FetchResult>( { ...operation, query: print(operation.query) }, { next: sink.next.bind(sink), complete: sink.complete.bind(sink), error: sink.error.bind(sink), }, ); }); } } const link = new SSELink({ url: 'http://where.is:4000/graphql/stream' , headers: () => { const session = getSession(); if (!session) return {}; return { Authorization: `Bearer ${session.token} ` , }; }, });

🔗 Client usage for HTTP/1 (aka. single connection mode) import { createClient } from 'graphql-sse' ; const client = createClient({ singleConnection: true , url: 'http://use.single:4000/connection/graphql/stream' , });

🔗 Client usage with custom retry timeout strategy import { createClient } from 'graphql-sse' ; import { waitForHealthy } from './my-servers' ; const url = 'http://i.want.retry:4000/control/graphql/stream' ; const client = createClient({ url, retryWait: async function waitForServerHealthyBeforeRetry ( ) { await waitForHealthy(url); await new Promise ( ( resolve ) => setTimeout(resolve, 1000 + Math .random() * 3000 ), ); }, });

🔗 Client usage in browser < html > < head > < meta charset = "utf-8" /> < title > GraphQL over Server-Sent Events </ title > < script type = "text/javascript" src = "https://unpkg.com/graphql-sse/umd/graphql-sse.min.js" > </ script > </ head > < body > < script type = "text/javascript" > const client = graphqlSse.createClient({ url: 'http://umdfor.the:4000/win/graphql/stream' , }); </ script > </ body > </ html >

🔗 Client usage in Node const ws = require ( 'ws' ); const fetch = require ( 'node-fetch' ); const { AbortController } = require ( 'node-abort-controller' ); const Crypto = require ( 'crypto' ); const { createClient } = require ( 'graphql-sse' ); const client = createClient({ url: 'http://no.browser:4000/graphql/stream' , fetchFn: fetch, abortControllerImpl: AbortController, generateID: () => ([ 1e7 ] + -1e3 + -4e3 + -8e3 + -1e11 ).replace( /[018]/g , ( c ) => (c ^ (Crypto.randomBytes( 1 )[ 0 ] & ( 15 >> (c / 4 )))).toString( 16 ), ), });

🔗 Server handler usage with custom authentication import { createHandler } from 'graphql-sse' ; import { schema, getOrCreateTokenFromCookies, customAuthenticationTokenDiscovery, processAuthorizationHeader, } from './my-graphql' ; const handler = createHandler({ schema, authenticate: async (req, res) => { let token = req.headers[ 'x-graphql-event-stream-token' ]; if (token) { return Array .isArray(token) ? token.join( '' ) : token; } token = getOrCreateTokenFromCookies(req); token = processAuthorizationHeader(req.headers[ 'authorization' ]); token = await customAuthenticationTokenDiscovery(req); if (!token) return res.writeHead( 401 , 'Unauthorized' ).end(); if (req.method === 'POST' && req.headers.accept === 'text/event-stream' ) { return '' ; } return token; }, });

🔗 Server handler usage with dynamic schema import { createHandler } from 'graphql-sse' ; import { schema, checkIsAdmin, getDebugSchema } from './my-graphql' ; const handler = createHandler({ schema: async (req, executionArgsWithoutSchema) => { const isAdmin = await checkIsAdmin(req); if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema); return schema; }, });

🔗 Server handler usage with custom context value import { createHandler } from 'graphql-sse' ; import { schema, getDynamicContext } from './my-graphql' ; const handler = createHandler({ schema, context: async (req, args) => { return getDynamicContext(req, args); }, });

🔗 Server handler usage with custom execution arguments import { parse } from 'graphql' ; import { createHandler } from 'graphql-sse' ; import { getSchema, myValidationRules } from './my-graphql' ; const handler = createHandler({ onSubscribe: async (req, _res, params) => { const schema = await getSchema(req); const args = { schema, operationName: params.operationName, document : parse(params.query), variableValues: params.variables, }; return args; }, });

🔗 Server handler and client usage with persisted queries import { parse, ExecutionArgs } from 'graphql' ; import { createHandler } from 'graphql-sse' ; import { schema } from './my-graphql-schema' ; type QueryID = string ; const queriesStore: Record<QueryID, ExecutionArgs> = { iWantTheGreetings: { schema, document : parse( 'subscription Greetings { greetings }' ), }, }; const handler = createHandler( { onSubscribe: ( req, res, params ) => { const persistedQuery = queriesStore[params.extensions?.persistedQuery]; if (persistedQuery) { return { ...persistedQuery, variableValues: params.variables, }; } return res.writeHead( 404 , 'Query Not Found' ).end(); }, }, wsServer, ); import { createClient } from 'graphql-sse' ; const client = createClient({ url: 'http://persisted.graphql:4000/queries' , }); ( async ( ) => { const onNext = ( ) => { }; await new Promise ( ( resolve, reject ) => { client.subscribe( { query: '', extensions: { persistedQuery: 'iWantTheGreetings', }, }, { next: onNext, error: reject, complete: resolve, }, ); } ); expect( onNext ).toBeCalledTimes( 5 ); } ) () ;

Check the docs folder out for TypeDoc generated documentation.

Read about the exact transport intricacies used by the library in the GraphQL over Server-Sent Events Protocol document.