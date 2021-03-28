PGWIRE

PostgreSQL client library for Node.js which exposes all features of wire protocol.

Features

Memory efficient data streaming

Logical replication using pgoutput protocol

Multi-statement queries

Query pipelining

Efficient bytea transfering

Copy from stdin and to stdout

SCRAM-SHA-256 authentication method support

Pure js without dependencies

TODO

Connection leasing from pool

Listen/notify resubscribe ?

SSL support

Benchmarks

Connecting to PostgreSQL server

import pgwire from 'pgwire' ; const client = await pgwire.connect( 'postgres://USER:PASSWORD@HOST:PORT/DATABASE?application_name=my-app' );

https://www.postgresql.org/docs/11/libpq-connect.html#id-1.7.3.8.3.6

Good practice is to get connection URI from environment variable:

import pgwire from 'pgwire' ; const client = await pgwire.connect(process.env.POSTGRES);

Set POSTGRES environment variable when run node process:

$ POSTGRES= 'postgres://USER:PASSWORD@HOST:PORT/DATABASE' node app.js

pgwire.connect() function also accepts parameters as object:

const client = await pgwire.connect({ user : 'postgres' , password : 'postgres' , hostname : '127.0.0.1' , port : 5432 , database : 'postgres' , });

Its possible to pass multiple connection URIs or objects to pgwire.connect() function. In this case actual connection parameters will be computed by merging all parameters in first-win priority. Following technique can be used to set default connection parameters:

const client = await pgwire.connect(process.env.POSTGRES, { application_name : 'my-awesome-app' , });

Don't forget to .end() connection when you don't need it anymore:

const client = await pgwire.connect(process.env.POSTGRES); try { } finally { client.end(); }

Using pgwire in web server

import express from 'express' ; import pgwire from 'pgwire' ; const app = express(); app.locals.pg = pgwire.pool(process.env.POSTGRES); app.get( '/' , function ( req, res, next ) { const { pg } = req.app.locals; pg.query({ statement : `SELECT 'hello world'` , }) .then( ( { scalar: greeting } ) => { res.end(greeting); }) .catch(next); }); app.listen( 3000 ); process.on( 'SIGINT' , _ => { app.locals.pg.clear(); app.close(); });

$ POSTGRES= 'postgres://USER:PASSWORD@HOST:PORT/DATABASE?poolMaxConnections=4&idleTimeout=900000' node app.js

When poolMaxConnections is not set then a new connection is created for each query. This option makes possible to switch to external connection pool like pgBouncer.

Querying

const { rows } = await client.query({ statement : ` SELECT i, 'Mississippi' FROM generate_series(1, 3) i ` , }); console .assert( JSON .stringify(rows) == JSON .stringify([ [ 1 , 'Mississippi' ], [ 2 , 'Mississippi' ], [ 3 , 'Mississippi' ], ]));

Function call and other single-value query result can be accessed by scalar property. It equals to the first cell of the first row.

const { scalar } = await client.query({ statement : `SELECT current_user` , }); console .assert(scalar == 'postgres' );

If query returns no rows then scalar equals to undefined .

Parametrized query

const { rows } = await client.query({ statement : ` SELECT $1 FROM generate_series(1, $2) ` , params : [{ type : 'text' , value : 'hello' , }, { type : 'int4' , value : 3 , }], }); console .assert( JSON .stringify(rows) == JSON .stringify([ [ 'hello' ], [ 'hello' ], [ 'hello' ], ]));

Multi-statement queries

Its possible to execute multiple statements within a single query.

const { results : [ { rows : categories }, { rows : products }, ], } = await client.query({ statement : ` SELECT id, name FROM category ` , }, { statement : ` SELECT id, category_id, name FROM product ` , });

Multi-statement query is wrapped into transaction implicitly. Implicit transaction does rollback automatically when error occures or does commit when all statements successfully executed. Multi-statement queries and implicit transactions are described here https://www.postgresql.org/docs/11/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT

Response top level rows and scalar properties take their values from last statement result:

const { scalar, rows } = await client.query({ statement : `SELECT 'a'` , }, { statement : `SELECT 'b', 1` , }); console .assert(scalar == 'b' ); console .assert( JSON .stringify(rows) == JSON .stringify([[ 'b' , 1 ]]) );

Reading from stream

.query() response can be consumed in two ways. First way is to await response like a promise (or call .then explicitly). In this case all rows will be loaded in memory.

Other way to consume response is to use it as Readable stream.

const response = client.query({ statement : `SELECT i FROM generate_series(1, 2000) i` , }); let sum = 0 ; for await ( const chunk of response) { if (chunk.boundary) { continue ; } const [i] = chunk; sum += i; } console .log(sum);

Copy from stdin

If statement is COPY ... FROM STDIN then stdin parameter must be set to instance of stream.Readable .

await client.query({ statement : `COPY foo FROM STDIN` , stdin : fs.createReadableStream( '/tmp/file' ), });

Copy to stdout

COPY ... TO STDOUT statement acts like SELECT but fills rows with Buffer instances instead of row tuples.

const { rows } = await client.query({ statement : ` COPY ( SELECT i, 'Mississippi' FROM generate_series(1, 3) i ) TO STDOUT ` , }); const tsv = Buffer.concat(rows).toString(); console .assert(tsv == ( '1\tMississippi

' + '2\tMississippi

' + '3\tMississippi

' ));

Dump table data to TSV file:

import { pipeline } from 'stream' ; import { promisify } from 'util' ; const ppipeline = promisify(pipeline); const copyUpstream = client.query({ statement : `COPY upstream_table TO STDOUT` , }); await ppipeline( copyUpstream, fs.createWritableStream( '/tmp/file.tsv' ), );

Copy table from one database to another:

const copyUpstream = clientA.query({ statement : `COPY upstream_table TO STDOUT` , }); await clientB.query({ statement : `COPY downstream_table FROM STDIN` , stdin : copyUpstream, });

Response stream emits boundary chunks at start and end of stream, but this chunks are ignored by piped writable stream since boundary chunks inherited from empty Buffer .

Listen and Notify

https://www.postgresql.org/docs/11/sql-notify.html

client.on( 'notification' , ({ channel, payload }) => { console .log(channel, payload); }); await client.query( `LISTEN some_channel` );

Simple and Extended query protocols

Postgres protocol has two query subprotocols - simple and extended. Simple protocol allows to send multi-statement query as single script where statements are delimited by semicolon. pgwire uses simple protocol when .query() is called with a string in first argument:

await client.query( ` CREATE TABLE foo (a int, b text); INSERT INTO foo VALUES (1, 'hello'); COPY foo FROM STDIN; SELECT * FROM foo; ` , { stdin : fs.createReadableStream( '/tmp/file1.tsv' ), stdin : [ fs.createReadableStream(...), fs.createReadableStream(...), ... ], });

Extended query protocol allows to pass parameters for each statement so it splits statements into separate chunks. Its possible to use extended query protocol by passing one or more statement objects to .query() function:

await client.query({ statement : `CREATE TABLE foo (a int, b text)` , }, { statement : `INSERT INTO foo VALUES ($1, $2)` , params : [{ type : 'int4' , value : 1 , }, { type : 'text' , value : 'hello' , }], }, { statement : 'COPY foo FROM STDIN' , stdin : fs.createReadableStream( '/tmp/file1.tsv' ), }, { statement : 'SELECT * FROM foo' , });

Low level extended protocol messages:

await client.query({ op : 'parse' , statement : `SELECT $1, $2` , statementName : 'example-statement' , paramTypes : [ 'int4' , 25 , ], }, { op : 'bind' , statementName : 'example-statement' , portal : 'example-portal' , params : [{ type : 'int4' , value : 1 , }, { type : 25 , value : 'hello' , }], }, { op : 'execute' , portal : 'example-portal' , limit : 10 , stdin : fs.createReadStream(...), });

Logical replication

Logical replication is native PostgreSQL mechanism which allows your app to subscribe on data modification events such as insert, update, delete and truncate. This mechanism can be useful in different ways - replicas synchronization, cache invalidation or history tracking. https://www.postgresql.org/docs/11/logical-replication.html

Lets prepare database for logical replication. At first we need to configure PostgreSQL server to write enough information to WAL:

ALTER SYSTEM SET wal_level = logical ;

We need to create replication slot for our app. Replication slot is PostgreSQL entity which behaves like a message queue of replication events.

SELECT pg_create_logical_replication_slot( 'my-app-slot' , 'test_decoding' );

Generate some modification events:

CREATE TABLE foo(a INT NOT NULL PRIMARY KEY , b TEXT ); INSERT INTO foo VALUES ( 1 , 'hello' ), ( 2 , 'world' ); UPDATE foo SET b = 'all' WHERE a = 1 ; DELETE FROM foo WHERE a = 2 ; TRUNCATE foo;

Now we are ready to consume replication events:

import pgwire from 'pgwire' ; const client = await pgwire.connect(process.env.POSTGRES, { replication : 'database' , }); try { const replicationStream = await client.logicalReplication({ slot : 'my-app-slot' , startLsn : '0/0' , }); process.on( 'SIGINT' , _ => replicationStream.destroy()); for await ( const { lsn, data } of replicationStream) { console .log(data.toString()); replicationStream.ack(lsn); } } finally { client.end(); }

"pgoutput" logical replication decoder

Modification events go through plugable logical decoder before events emitted to client. Purpose of logical decoder is to serialize in-memory events structures into consumable message stream. PostgreSQL has two built-in logical decoders:

test_decoding which emits human readable messages for debugging and testing,

and production usable pgoutput logical decoder which adapts modification events for replicas synchronization. pgwire implements pgoutput messages parser.

CREATE TABLE foo(a INT NOT NULL PRIMARY KEY , b TEXT ); CREATE PUBLICATION "my-app-pub" FOR TABLE foo; SELECT pg_create_logical_replication_slot( 'my-app-slot' , 'pgoutput' )