Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@
## Table of Contents

- [RabbitMQ client for the stream protocol for Node.JS](#rabbitmq-client-for-the-stream-protocol-for-nodejs)
- [Table of Contents](#table-of-contents)
- [Overview](#overview)
- [Installing via NPM](#installing-via-npm)
- [Getting started](#getting-started)
- [Usage](#usage)
- [Connect](#connect)
- [Connect through TLS/SSL](#connect-through-tls-ssl)
- [Connect through TLS/SSL](#connect-through-tlsssl)
- [Basic Publish](#basic-publish)
- [Sub Batch Entry Publishing](#sub-batch-entry-publishing)
- [Basic Consuming](#basic-consuming)
- [Single Active Consumer](#single-active-consumer)
- [Clustering](#clustering)
- [Load Balancer](#loadbalancer)
- [Load Balancer](#load-balancer)
- [Super Stream](#super-stream)
- [Filtering](#filtering)
- [Mitigating connection issues](#mitigating-connection-issues)
- [Running Examples](#running-examples)
- [Build from source](#build-from-source)
- [MISC](#misc)
- [Super Stream](#super-stream)
- [Filtering](#filtering)

## Overview

Expand Down Expand Up @@ -210,9 +212,12 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message

### Clustering

Every time we create a new producer or a new consumer, a new connection is created.
In particular for the producer the connection is created on the node leader.
For more running the tests in a cluster follow the readme under the folder /cluster
Every time we create a new producer or a new consumer, a new connection object is created. The underlying TCP connections can be shared among different producers and different consumers. Note however that:

- each `Client` instance has a unique connection, which is not shared in any case.
- for producers the connection is created on the node leader.
- consumers and producers do not share connections.
For more about running the tests in a cluster follow the readme under the folder /cluster

### Load Balancer

Expand Down Expand Up @@ -320,6 +325,41 @@ await sleep(2000)
await client.close()
```

### Mitigating connection issues

The library exposes some utility functions and properties that can help in building a more robust client application. One simple use case that is addressed in one of the examples (`example/autoreconnect_example.js`) shows how to build a client application that can handle simple network issues like a temporary disconnection. In this scenario we are _not_ dealing with a complex broker-side service disruption or a cluster reorganization; in particular, we assume that the stream topology and the node host names do not change.

The approach can be simply summed up as: register a `connection_closed` listener when instantiating a `Client` object, and then call the `client.restart().then(...)` method in its body.

```typescript
const connectionClosedCallback = () => {
logger.info(`In connection closed event...`)
client
.restart()
.then(() => {
logger.info(`Connections restarted!`)
})
.catch((reason) => {
logger.warn(`Could not reconnect to Rabbit! ${reason}`)
})
}

client = await rabbit.connect({
//...
listeners: { connection_closed: connectionClosedCallback },
//...
})
```

There are various considerations to keep in mind when building a client application around these features:

- this `connection_closed` callback is registered on the event of the TCP socket used by the `Client` object instance. This callback cannot be an `async` function, so we need to use `then(...).catch(...)` to deal with the outcome of the restart attempt.
- clients, producers and consumers expose a utility function `getConnectionInfo()` that returns information about the state of the underlying TCP socket and the logical connection to the broker. In particular, the `ready` field indicates if the handshake with the broker completed correctly.
- consider using the outbox pattern when sending messages to the broker.
- some form of deduplication should be implementend in the client application when receiving messages: the use of offsets when defining consumer instances does not avoid the possibility of receiving messages multiple times.

See the comments and the implemenation in `example/autoreconnect_example.js` for a more in-depth view.

## Running Examples

the folder /example contains a project that shows some examples on how to use the lib, to run it follow this steps
Expand Down
2 changes: 1 addition & 1 deletion example/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

279 changes: 252 additions & 27 deletions example/src/autoreconnect_example.js
Original file line number Diff line number Diff line change
@@ -1,49 +1,274 @@
/*

Auto-reconnect example: mitigate simple network disconnection issues

In this example we assume that:
- the stream topology does not change (i.e. leader/replicas nodes do not change)
- hostnames/ip addresses do not change
- the connection_closed event is triggered on the TCP connection used by the Client instance

The example is composed of
- message generation part (mimicks the behavior of a client application)
- toy outbox pattern implementation (in-memory structure, no persistence of data, etc.)
- a client instance with a registered callback on the connection_closed event
- scheduled delivery of messages through a producer
- very simple publish_confirm handling
- one consumer
- a scheduled reachability interruption (in this case obtained by launching `docker-compose restart`)
- a scheduled process that logs the state of the application (connections, message counters)
*/

const rabbit = require("rabbitmq-stream-js-client")
const { randomUUID } = require("crypto")
const { exec } = require("child_process")
const { promisify, inspect } = require("util")
const promiseExec = promisify(exec)
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))

const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"
let client = undefined
let publisher = undefined
let consumer = undefined
let consumerOffset = rabbit.Offset.first()
let streamName = `example-${randomUUID()}`
const publisherOutbox = { messageIds: [], messages: new Map(), publishingIds: new Map(), offset: 0 }
const received = new Set()
let publishingId = 1n
let restartCount = 0
let published = 0
let confirmed = 0
let callbackCalled = 0
const logger = {
debug: (msg) => { return },
info: (msg) => console.log(`[info]\t[${new Date().toISOString()}] ${msg}`),
error: (msg) => console.log(`[error]\t[${new Date().toISOString()}] ${msg}`),
warn: (msg) => console.log(`[warn]\t[${new Date().toISOString()}] ${msg}`),
}


function getNodesFromEnv() {
const envValue = process.env.RABBIT_MQ_TEST_NODES ?? "localhost:5552"
const nodes = envValue.split(";")
return nodes.map((n) => {
const [host, port] = n.split(":")
return { host: host ?? "localhost", port: parseInt(port) ?? 5552 }
})
}

async function triggerConnectionIssue() {
const res = await promiseExec("cd .. && docker-compose restart")
return true
}

/*
very simple message generation function
*/
function generateMessage() {
const payload = Buffer.from(`${randomUUID()}`)
const messageId = `${randomUUID()}`
return { payload, messageId }
}

/*
at each iteration, a new message is put in the outbox.
This mimicks a client application that generates messages to be sent.
*/
function scheduleMessageProduction() {
setInterval(() => {
const { payload, messageId } = generateMessage()
publisherOutbox.messageIds.push(messageId)
publisherOutbox.messages.set(messageId, payload)
}, 50)
}

/*
at each iteration, a new message is read from the outbox and sent using the publisher.
Note that the operation is executed only if
there is a new message to be sent and if the publisher connection is at least established.
If the publisher is not `ready`, then the message will be cached internally.
*/
async function scheduleMessageDelivery() {
setInterval(async () => {
//keep track of the last message sent (but not yet confirmed)
const messageOffset = publisherOutbox.offset
const oldestMessageId = publisherOutbox.messageIds[messageOffset]
//is the publisher socket open?
const { writable } = publisher?.getConnectionInfo() ?? false
if (publisher && writable && oldestMessageId !== undefined) {
const message = publisherOutbox.messages.get(oldestMessageId)
const res = await publisher.send(message, { messageProperties: { messageId: `${oldestMessageId}` } })
published++
publisherOutbox.offset++
if (res.publishingId !== undefined) {
//keep track of the messageId, by mapping it with the protocol-generated publishingId
publisherOutbox.publishingIds.set(res.publishingId, oldestMessageId)
}
}
}, 10)
}

/*
at each interval, the state of the outbox, the message counters and the state of client connections will be logged.
*/
function scheduleLogInfo() {
setInterval(() => {
logger.info(`outbox queue length: ${publisherOutbox.messageIds.length} offset ${publisherOutbox.offset}`)
logger.info(`${inspect({ published, confirmed, received: received.size })}`)
logger.info(`client local port: ${inspect(client && client.getConnectionInfo().localPort)} consumer local port: ${inspect(consumer && consumer.getConnectionInfo().localPort)} publisher local port: ${inspect(publisher && publisher.getConnectionInfo().localPort)}`)
}, 3000)
}

async function main() {
const streamName = `example-${randomUUID()}`
console.log(`Creating stream ${streamName}`)

let client = undefined
/*
at each interval, trigger a connection problem.
*/
async function triggerConnectionIssues() {
return new Promise((res, rej) => {
setInterval(async () => {
logger.info("Closing!")
restartCount++
await triggerConnectionIssue()
if (restartCount >= 1000) {
try {
logger.info("Terminating...")
if (client) await client.close()
res(true)
return
}
catch (e) {
rej(e)
return
}
}
//after this message is logged, the client connections should reopen
logger.info("\nNow it should reopen!\n")
}, 60000)
})
}

/*
when setting up the publisher, we register a callback on the `publish_confirm` event that
informs us that the broker has correctly received the sent message. This triggers an update on
the outbox state (the message is considered as sent)
*/
async function setupPublisher(client) {
const publisherRef = `publisher - ${randomUUID()}`
const publisherConfig = { stream: streamName, publisherRef: publisherRef, connectionClosedListener: (err) => { return } }
/*
confirmedIds contains the list of publishingIds linked to messages correctly published in the stream
These ids are not the messageIds that have been set in the message properties
*/
const publisherConfirmCallback = (err, confirmedIds) => {
if (err) {
logger.info(`Publish confirm error ${inspect(err)} `)
return
}
confirmed = confirmed + confirmedIds.length
confirmedMessageIds = confirmedIds.map((publishingId) => {
const messageId = publisherOutbox.publishingIds.get(publishingId)
publisherOutbox.publishingIds.delete(publishingId)
return messageId
})

const connectToRabbit = async () => {
publisherOutbox.messageIds = publisherOutbox.messageIds.filter((id) => {
return !confirmedMessageIds.includes(id)
})
confirmedMessageIds.forEach((id) => {
if (publisherOutbox.messages.delete(id)) {
publisherOutbox.offset = publisherOutbox.offset - 1
}
})
}
const publisher = await client.declarePublisher(publisherConfig)
publisher.on("publish_confirm", publisherConfirmCallback)
publisherOutbox.offset = 0
return publisher
}

/*
in the consumer we can use the `messageId` property to make sure each message is "handled" once.
*/
async function setupConsumer(client) {
const consumerConfig = { stream: streamName, offset: rabbit.Offset.timestamp(new Date()), connectionClosedListener: (err) => { return } }
const receiveCallback = (msg) => {
const msgId = msg.messageProperties.messageId
if (received.has(msgId)) {
/*On restart, the consumer sets automatically its offset as the latest handled message index.
For sanity, some sort of deduplication is still needed.
@see https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/
and Consumer.storeOffset and Consumer.queryOffset for a more complete approach
*/
logger.info(`dedup: ${msgId}`)
}
received.add(msgId)
consumerOffset = msg.offset
return
}
return client.declareConsumer(consumerConfig, receiveCallback)
}


/*
setup of a client instance, a producer and a consumer.
The core of the example is represented by the implementation of the
`connection_closed` callback, in which the `client.restart()` method is called.
This triggers the reset of all TCP sockets involved, for all producers and consumers,
as well as for the TCP socket used by the client itself.
*/
async function setup() {
try {
const connectionClosedCallback = () => {
logger.info(`In connection closed event...`)
callbackCalled++
if (callbackCalled > 10) {
process.exit(0)
}
client.restart().then(() => {
logger.info(`Connections restarted!`)
}).catch((reason) => {
logger.warn(`Could not reconnect to Rabbit! ${reason}`)
})
}
const firstNode = getNodesFromEnv()[0]
logger.info(`Now invoking rabbit.connect on ${inspect(firstNode)}`)
client = await rabbit.connect({
hostname: "localhost",
port: 5553,
hostname: firstNode.host,
port: firstNode.port,
username: rabbitUser,
password: rabbitPassword,
listeners: {
connection_closed: async () => {
await sleep(Math.random() * 3000)
connectToRabbit()
.then(() => console.log("Successfully re-connected to rabbit!"))
.catch((e) => console.error("Error while reconnecting to Rabbit!", e))
},
},
listeners: { connection_closed: connectionClosedCallback, },
vhost: "/",
heartbeat: 0,
})
}, logger)
await client.createStream({ stream: streamName, arguments: {} })
publisher = await setupPublisher(client)
consumer = await setupConsumer(client)
return { client, publisher, consumer }
}
catch (err) {
logger.warn(`Setup-wide error: ${inspect(err)}`)
}
}

await connectToRabbit()

await sleep(2000)

console.log("Closing!")
await client.close()
console.log("Now it should reopen!")

await sleep(10000)
async function main() {
//instantiate the client, the producer and the consumer
await setup()
//schedule the task that inserts new messages in the outbox
scheduleMessageProduction()
//schedule the task that attempts to send a message to the broker, taking it from the outbox
await scheduleMessageDelivery()
//schedule the task that logs connection info and message counters
scheduleLogInfo()
//schedule the task that triggers a (more or less simulated) network issue
await triggerConnectionIssues()
}

main()
.then(() => console.log("done!"))
.then(() => logger.info("done!"))
.catch((res) => {
console.log("ERROR ", res)
logger.info("ERROR ", res)
process.exit(-1)
})
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))

Loading