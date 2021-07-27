nsqjs

The official NodeJS client for the nsq client protocol. This implementation attempts to be fully compliant and maintain feature parity with the official Go (go-nsq) and Python (pynsq) clients.

Usage

new Reader(topic, channel, options)

The topic and channel arguments are strings and must be specified. The options argument is optional. Below are the parameters that can be specified in the options object.

maxInFlight: 1

The maximum number of messages to process at once. This value is shared between nsqd connections. It's highly recommended that this value is greater than the number of nsqd connections.

The frequency in seconds at which the nsqd will send heartbeats to this Reader.

The maximum amount of time (seconds) the Reader will backoff for any single backoff event.

The number of times a given message will be attempted (given to MESSAGE handler) before it will be handed to the DISCARD handler and then automatically finished. 0 means that there is no limit. If no DISCARD handler is specified and maxAttempts > 0 , then the message will be finished automatically when the number of attempts has been exhausted.

The default amount of time (milliseconds) a message requeued should be delayed by before being dispatched by nsqd.

A string or an array of strings representing the host/port pair for nsqd instances.

For example: ['localhost:4150']

A string or an array of strings representing the host/port pair of nsqlookupd instaces or the full HTTP/HTTPS URIs of the nsqlookupd instances.

For example: ['localhost:4161'] , ['http://localhost/lookup'] , ['http://localhost/path/lookup?extra_param=true']

The frequency in seconds for querying lookupd instances.

The jitter applied to the start of querying lookupd instances periodically.

The timeout in milliseconds for switching between connections when the Reader maxInFlight is less than the number of connected NSQDs.

Use TLS if nsqd has TLS support enabled.

Require verification of the TLS cert. This needs to be false if you're using a self signed cert.

Use zlib Deflate compression.

Use zlib Deflate compression level.

Use Snappy compression.

Authenticate using the provided auth secret.

The size in bytes of the buffer nsqd will use when writing to this client. -1 disables buffering. outputBufferSize >= 64

The timeout after which any data that nsqd has buffered will be flushed to this client. Value is in milliseconds. outputBufferTimeout >= 1 . A value of -1 disables timeouts.

Sets the server-side message timeout in milliseconds for messages delivered to this client.

Deliver a percentage of all messages received to this connection. 1 <= sampleRate <= 99

An identifier used to disambiguate this client.

Socket timeout after idling for the duration in second (default to 0 means disabled).

Reader events are:

Reader.READY or ready

or Reader.MESSAGE or message

or Reader.ERROR or error

or Reader.NSQD_CLOSED or nsqd_closed

Reader.MESSAGE and Reader.DISCARD both produce Message objects. Reader.NSQD_CONNECTED and Reader.NSQD_CLOSED events both provide the host and port of the nsqd to which the event pertains.

These methods are available on a Reader object:

connect()

Connect to the nsqds specified or connect to nsqds discovered via lookupd.

Disconnect from all nsqds. Does not wait for in-flight messages to complete.

Pause the Reader by stopping message flow. Does not affect in-flight messages.

Unpauses the Reader by resuming normal message flow.

true if paused, false otherwise.

Message

The following properties and methods are available on Message objects produced by a Reader instance.

timestamp

Numeric timestamp for the Message provided by nsqd.

The number of attempts that have been made to process this message.

The opaque string id for the Message provided by nsqd.

Boolean for whether or not a response has been sent.

The message payload as a Buffer object.

Parses message payload as JSON and caches the result.

Returns the amount of time until the message times out. If the hard argument is provided, then it calculates the time until the hard timeout when nsqd will requeue inspite of touch events.

: Returns the amount of time until the message times out. If the hard argument is provided, then it calculates the time until the hard timeout when nsqd will requeue inspite of touch events. finish()

Finish the message as successful.

Finish the message as successful. requeue(delay=null, backoff=true) The delay is in milliseconds. This is how long nsqd will hold on the message before attempting it again. The backoff parameter indicates that we should treat this as an error within this process and we need to backoff to recover.

Tell nsqd that you want extra time to process the message. It extends the soft timeout by the normal timeout amount.

new Writer(nsqdHost, nsqdPort, options)

Allows messages to be sent to an nsqd.

Available Writer options:

tls: false

Use TLS if nsqd has TLS support enabled.

Require verification of the TLS cert. This needs to be false if you're using a self signed cert.

Use zlib Deflate compression.

Use zlib Deflate compression level.

Use Snappy compression.

An identifier used to disambiguate this client.

Writer events are:

Writer.READY or ready

or Writer.ERROR or error

These methods are available on a Writer object:

connect()

Connect to the nsqd specified.

Disconnect from nsqd.

topic is a string. msgs is either a string, a Buffer , JSON serializable object, a list of strings / Buffers / JSON serializable objects. callback takes a single error argument.

topic is a string. msg is either a string, a Buffer , JSON serializable object. timeMs is the delay by which the message should be delivered. callback takes a single error argument.

Simple example

Start nsqd and nsqdlookupd

$ nsqlookupd & $ nsqd -lookupd-tcp-address=127.0.0.1:4160 -broadcast-address=127.0.0.1 &

const nsq = require ( 'nsqjs' ) const reader = new nsq.Reader( 'sample_topic' , 'test_channel' , { lookupdHTTPAddresses : '127.0.0.1:4161' }) reader.connect() reader.on( 'message' , msg => { console .log( 'Received message [%s]: %s' , msg.id, msg.body.toString()) msg.finish() })

Publish a message to nsqd to be consumed by the sample client:

$ curl -d "it really tied the room together" http://localhost:4151/pub?topic=sample_topic

Example with message timeouts

This script simulates a message that takes a long time to process or at least longer than the default message timeout. To ensure that the message doesn't timeout while being processed, touch events are sent to keep it alive.

const nsq = require ( 'nsqjs' ) const reader = new nsq.Reader( 'sample_topic' , 'test_channel' , { lookupdHTTPAddresses : '127.0.0.1:4161' }) reader.connect() reader.on( 'message' , msg => { console .log( 'Received message [%s]' , msg.id) const touch = () => { if (!msg.hasResponded) { console .log( 'Touch [%s]' , msg.id) msg.touch() setTimeout(touch, msg.timeUntilTimeout() - 1000 ) } } const finish = () => { console .log( 'Finished message [%s]: %s' , msg.id, msg.body.toString()) msg.finish() } console .log( 'Message timeout is %f secs.' , msg.timeUntilTimeout() / 1000 ) setTimeout(touch, msg.timeUntilTimeout() - 1000 ) setTimeout(finish, msg.timeUntilTimeout() * 2 + 1000 ) })

Enable nsqjs debugging

nsqjs uses debug to log debug output.

To see all nsqjs events:

$ DEBUG=nsqjs:* node my_nsqjs_script.js

To see all reader events:

$ DEBUG=nsqjs:reader:* node my_nsqjs_script.js

To see a specific reader's events:

$ DEBUG=nsqjs:reader: < topic > / < channel > :* node my_nsqjs_script.js

Replace <topic> and <channel>

To see all writer events:

$ DEBUG=nsqjs:writer:* node my_nsqjs_script.js

A Writer Example

The writer sends a single message and then a list of messages.

const nsq = require ( 'nsqjs' ) const w = new nsq.Writer( '127.0.0.1' , 4150 ) w.connect() w.on( 'ready' , () => { w.publish( 'sample_topic' , 'it really tied the room together' ) w.deferPublish( 'sample_topic' , [ 'This message gonna arrive 1 sec later.' ], 1000 ) w.publish( 'sample_topic' , [ 'Uh, excuse me. Mark it zero. Next frame.' , 'Smokey, this is not \'Nam. This is bowling. There are rules.' ]) w.publish( 'sample_topic' , 'Wu?' , err => { if (err) { return console .error(err.message) } console .log( 'Message sent successfully' ) w.close() }) }) w.on( 'closed' , () => { console .log( 'Writer closed' ) })

Changes