A Promisified layer over rhea AMQP client.
npm install rhea-promise
You can set the following environment variable to get the debug logs.
export DEBUG=rhea-promise*
export DEBUG=rhea*
DEBUG environment variable as follows:
export DEBUG=rhea*,-rhea:raw,-rhea:message,-rhea-promise:eventhandler,-rhea-promise:translate
DEBUG environment variable as shown above and then run your test script as follows:
out.log and logging statement from the sdk go to
debug.log.
node your-test-script.js > out.log 2>debug.log
out.log by redirecting stderr to stdout (&1), and then redirect stdout to a file:
node your-test-script.js >out.log 2>&1
out.log.
node your-test-script.js &> out.log
AMQP, for two peers to communicate successfully, different entities (Container, Connection, Session, Link) need to be created. There is a relationship between those entities.
rhea propagates those events to its parent entity.
If they are not handled at the
Container level (uber parent), then they are transformed into an
error event. This would cause your
application to crash if there is no listener added for the
error event.
rhea-promise, the library creates, equivalent objects
Connection, Session, Sender, Receiver and wraps objects from
rhea within them.
It adds event listeners to all the possible events that can occur at any level and re-emits those events with the same arguments as one would
expect from rhea. This makes it easy for consumers of
rhea-promise to use the EventEmitter pattern. Users can efficiently use different
event emitter methods like
.once(),
.on(),
.prependListeners(), etc. Since
rhea-promise add those event listeners on
rhea objects,
the errors will never be propagated to the parent entity. This can be good as well as bad depending on what you do.
*_error events and
*_close events emitted on an entity will not be propagated to it's parent. Thus ensuring that errors are handled at the right level.
*_error and
*_close events at the right level, then you will never know why an entity shutdown.
We believe our design enforces good practices to be followed while using the event emitter pattern.
Please take a look at the sample.env file for examples on how to provide the values for different parameters like host, username, password, port, senderAddress, receiverAddress, etc.
Sender.
> ts-node ./examples/send.ts.
NOTE: If you are running the sample with
.env config file, then please run the sample from the directory that contains
.env config file.
import {
Connection, Sender, EventContext, Message, ConnectionOptions, Delivery, SenderOptions
} from "rhea-promise";
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();
const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const senderAddress = process.env.SENDER_ADDRESS || "address";
async function main(): Promise<void> {
const connectionOptions: ConnectionOptions = {
transport: "tls",
host: host,
hostname: host,
username: username,
password: password,
port: port,
reconnect: false
};
const connection: Connection = new Connection(connectionOptions);
const senderName = "sender-1";
const senderOptions: SenderOptions = {
name: senderName,
target: {
address: senderAddress
},
onError: (context: EventContext) => {
const senderError = context.sender && context.sender.error;
if (senderError) {
console.log(">>>>> [%s] An error occurred for sender '%s': %O.",
connection.id, senderName, senderError);
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(">>>>> [%s] An error occurred for session of sender '%s': %O.",
connection.id, senderName, sessionError);
}
}
};
await connection.open();
const sender: Sender = await connection.createSender(senderOptions);
const message: Message = {
body: "Hello World!!",
message_id: "12343434343434"
};
// Please, note that we are not awaiting on sender.send()
// You will notice that `delivery.settled` will be `false`.
const delivery: Delivery = sender.send(message);
console.log(">>>>>[%s] Delivery id: %d, settled: %s",
connection.id,
delivery.id,
delivery.settled);
await sender.close();
await connection.close();
}
main().catch((err) => console.log(err));
AwaitableSender
> ts-node ./examples/awaitableSend.ts.
import {
Connection, Message, ConnectionOptions, Delivery, AwaitableSenderOptions, AwaitableSender
} from "rhea-promise";
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();
const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const senderAddress = process.env.SENDER_ADDRESS || "address";
async function main(): Promise<void> {
const connectionOptions: ConnectionOptions = {
transport: "tls",
host: host,
hostname: host,
username: username,
password: password,
port: port,
reconnect: false
};
const connection: Connection = new Connection(connectionOptions);
const senderName = "sender-1";
const awaitableSenderOptions: AwaitableSenderOptions = {
name: senderName,
target: {
address: senderAddress
},
sendTimeoutInSeconds: 10
};
await connection.open();
// Notice that we are awaiting on the message being sent.
const sender: AwaitableSender = await connection.createAwaitableSender(
awaitableSenderOptions
);
for (let i = 0; i < 10; i++) {
const message: Message = {
body: `Hello World - ${i}`,
message_id: i
};
// Note: Here we are awaiting for the send to complete.
// You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects.
const delivery: Delivery = await sender.send(message);
console.log(
"[%s] await sendMessage -> Delivery id: %d, settled: %s",
connection.id,
delivery.id,
delivery.settled
);
}
await sender.close();
await connection.close();
}
main().catch((err) => console.log(err));
> ts-node ./examples/receive.ts.
NOTE: If you are running the sample with
.env config file, then please run the sample from the directory that contains
.env config file.
import {
Connection, Receiver, EventContext, ConnectionOptions, ReceiverOptions, delay, ReceiverEvents
} from "rhea-promise";
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();
const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const receiverAddress = process.env.RECEIVER_ADDRESS || "address";
async function main(): Promise<void> {
const connectionOptions: ConnectionOptions = {
transport: "tls",
host: host,
hostname: host,
username: username,
password: password,
port: port,
reconnect: false
};
const connection: Connection = new Connection(connectionOptions);
const receiverName = "receiver-1";
const receiverOptions: ReceiverOptions = {
name: receiverName,
source: {
address: receiverAddress
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connection.id, receiverName, sessionError);
}
}
};
await connection.open();
const receiver: Receiver = await connection.createReceiver(receiverOptions);
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("Received message: %O", context.message);
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(">>>>> [%s] An error occurred for receiver '%s': %O.",
connection.id, receiverName, receiverError);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
await connection.close();
}
main().catch((err) => console.log(err));
git clone https://github.com/amqp/rhea-promise.git
npm i -g typescript
npm i -g ts-node
npm i
npm run build
Amqp protocol specification can be found here.