diff --git a/README.md b/README.md index 0103aea9..ec13b7ca 100644 --- a/README.md +++ b/README.md @@ -5,23 +5,25 @@ ## Table of Contents - [RabbitMQ client for the stream protocol for Node.JS](#rabbitmq-client-for-the-stream-protocol-for-nodejs) + - [Table of Contents](#table-of-contents) - [Overview](#overview) - [Installing via NPM](#installing-via-npm) - [Getting started](#getting-started) - [Usage](#usage) - [Connect](#connect) - - [Connect through TLS/SSL](#connect-through-tls-ssl) + - [Connect through TLS/SSL](#connect-through-tlsssl) - [Basic Publish](#basic-publish) - [Sub Batch Entry Publishing](#sub-batch-entry-publishing) - [Basic Consuming](#basic-consuming) - [Single Active Consumer](#single-active-consumer) - [Clustering](#clustering) - - [Load Balancer](#loadbalancer) + - [Load Balancer](#load-balancer) + - [Super Stream](#super-stream) + - [Filtering](#filtering) + - [Mitigating connection issues](#mitigating-connection-issues) - [Running Examples](#running-examples) - [Build from source](#build-from-source) - [MISC](#misc) - - [Super Stream](#super-stream) - - [Filtering](#filtering) ## Overview @@ -210,9 +212,12 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message ### Clustering -Every time we create a new producer or a new consumer, a new connection is created. -In particular for the producer the connection is created on the node leader. -For more running the tests in a cluster follow the readme under the folder /cluster +Every time we create a new producer or a new consumer, a new connection object is created. The underlying TCP connections can be shared among different producers and different consumers. Note however that: + +- each `Client` instance has a unique connection, which is not shared in any case. +- for producers the connection is created on the node leader. +- consumers and producers do not share connections. + For more about running the tests in a cluster follow the readme under the folder /cluster ### Load Balancer @@ -320,6 +325,41 @@ await sleep(2000) await client.close() ``` +### Mitigating connection issues + +The library exposes some utility functions and properties that can help in building a more robust client application. One simple use case that is addressed in one of the examples (`example/autoreconnect_example.js`) shows how to build a client application that can handle simple network issues like a temporary disconnection. In this scenario we are _not_ dealing with a complex broker-side service disruption or a cluster reorganization; in particular, we assume that the stream topology and the node host names do not change. + +The approach can be simply summed up as: register a `connection_closed` listener when instantiating a `Client` object, and then call the `client.restart().then(...)` method in its body. + +```typescript +const connectionClosedCallback = () => { + logger.info(`In connection closed event...`) + client + .restart() + .then(() => { + logger.info(`Connections restarted!`) + }) + .catch((reason) => { + logger.warn(`Could not reconnect to Rabbit! ${reason}`) + }) +} + +client = await rabbit.connect({ + //... + listeners: { connection_closed: connectionClosedCallback }, + //... +}) +``` + +There are various considerations to keep in mind when building a client application around these features: + +- this `connection_closed` callback is registered on the event of the TCP socket used by the `Client` object instance. This callback cannot be an `async` function, so we need to use `then(...).catch(...)` to deal with the outcome of the restart attempt. +- clients, producers and consumers expose a utility function `getConnectionInfo()` that returns information about the state of the underlying TCP socket and the logical connection to the broker. In particular, the `ready` field indicates if the handshake with the broker completed correctly. +- consider using the outbox pattern when sending messages to the broker. +- some form of deduplication should be implementend in the client application when receiving messages: the use of offsets when defining consumer instances does not avoid the possibility of receiving messages multiple times. + +See the comments and the implemenation in `example/autoreconnect_example.js` for a more in-depth view. + ## Running Examples the folder /example contains a project that shows some examples on how to use the lib, to run it follow this steps diff --git a/example/package-lock.json b/example/package-lock.json index d6f4f133..fb3437cb 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -23,7 +23,7 @@ "extraneous": true }, "..": { - "version": "0.1.1", + "version": "0.3.1", "license": "ISC", "dependencies": { "semver": "^7.5.4" diff --git a/example/src/autoreconnect_example.js b/example/src/autoreconnect_example.js index fc2a47b4..780ab2bd 100644 --- a/example/src/autoreconnect_example.js +++ b/example/src/autoreconnect_example.js @@ -1,49 +1,274 @@ +/* + + Auto-reconnect example: mitigate simple network disconnection issues + + In this example we assume that: + - the stream topology does not change (i.e. leader/replicas nodes do not change) + - hostnames/ip addresses do not change + - the connection_closed event is triggered on the TCP connection used by the Client instance + + The example is composed of + - message generation part (mimicks the behavior of a client application) + - toy outbox pattern implementation (in-memory structure, no persistence of data, etc.) + - a client instance with a registered callback on the connection_closed event + - scheduled delivery of messages through a producer + - very simple publish_confirm handling + - one consumer + - a scheduled reachability interruption (in this case obtained by launching `docker-compose restart`) + - a scheduled process that logs the state of the application (connections, message counters) +*/ + const rabbit = require("rabbitmq-stream-js-client") const { randomUUID } = require("crypto") +const { exec } = require("child_process") +const { promisify, inspect } = require("util") +const promiseExec = promisify(exec) +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)) const rabbitUser = process.env.RABBITMQ_USER || "rabbit" const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" +let client = undefined +let publisher = undefined +let consumer = undefined +let consumerOffset = rabbit.Offset.first() +let streamName = `example-${randomUUID()}` +const publisherOutbox = { messageIds: [], messages: new Map(), publishingIds: new Map(), offset: 0 } +const received = new Set() +let publishingId = 1n +let restartCount = 0 +let published = 0 +let confirmed = 0 +let callbackCalled = 0 +const logger = { + debug: (msg) => { return }, + info: (msg) => console.log(`[info]\t[${new Date().toISOString()}] ${msg}`), + error: (msg) => console.log(`[error]\t[${new Date().toISOString()}] ${msg}`), + warn: (msg) => console.log(`[warn]\t[${new Date().toISOString()}] ${msg}`), +} + + +function getNodesFromEnv() { + const envValue = process.env.RABBIT_MQ_TEST_NODES ?? "localhost:5552" + const nodes = envValue.split(";") + return nodes.map((n) => { + const [host, port] = n.split(":") + return { host: host ?? "localhost", port: parseInt(port) ?? 5552 } + }) +} + +async function triggerConnectionIssue() { + const res = await promiseExec("cd .. && docker-compose restart") + return true +} + +/* + very simple message generation function +*/ +function generateMessage() { + const payload = Buffer.from(`${randomUUID()}`) + const messageId = `${randomUUID()}` + return { payload, messageId } +} + +/* + at each iteration, a new message is put in the outbox. + This mimicks a client application that generates messages to be sent. +*/ +function scheduleMessageProduction() { + setInterval(() => { + const { payload, messageId } = generateMessage() + publisherOutbox.messageIds.push(messageId) + publisherOutbox.messages.set(messageId, payload) + }, 50) +} + +/* + at each iteration, a new message is read from the outbox and sent using the publisher. + Note that the operation is executed only if + there is a new message to be sent and if the publisher connection is at least established. + If the publisher is not `ready`, then the message will be cached internally. +*/ +async function scheduleMessageDelivery() { + setInterval(async () => { + //keep track of the last message sent (but not yet confirmed) + const messageOffset = publisherOutbox.offset + const oldestMessageId = publisherOutbox.messageIds[messageOffset] + //is the publisher socket open? + const { writable } = publisher?.getConnectionInfo() ?? false + if (publisher && writable && oldestMessageId !== undefined) { + const message = publisherOutbox.messages.get(oldestMessageId) + const res = await publisher.send(message, { messageProperties: { messageId: `${oldestMessageId}` } }) + published++ + publisherOutbox.offset++ + if (res.publishingId !== undefined) { + //keep track of the messageId, by mapping it with the protocol-generated publishingId + publisherOutbox.publishingIds.set(res.publishingId, oldestMessageId) + } + } + }, 10) +} + +/* + at each interval, the state of the outbox, the message counters and the state of client connections will be logged. + */ +function scheduleLogInfo() { + setInterval(() => { + logger.info(`outbox queue length: ${publisherOutbox.messageIds.length} offset ${publisherOutbox.offset}`) + logger.info(`${inspect({ published, confirmed, received: received.size })}`) + logger.info(`client local port: ${inspect(client && client.getConnectionInfo().localPort)} consumer local port: ${inspect(consumer && consumer.getConnectionInfo().localPort)} publisher local port: ${inspect(publisher && publisher.getConnectionInfo().localPort)}`) + }, 3000) +} -async function main() { - const streamName = `example-${randomUUID()}` - console.log(`Creating stream ${streamName}`) - let client = undefined +/* + at each interval, trigger a connection problem. +*/ +async function triggerConnectionIssues() { + return new Promise((res, rej) => { + setInterval(async () => { + logger.info("Closing!") + restartCount++ + await triggerConnectionIssue() + if (restartCount >= 1000) { + try { + logger.info("Terminating...") + if (client) await client.close() + res(true) + return + } + catch (e) { + rej(e) + return + } + } + //after this message is logged, the client connections should reopen + logger.info("\nNow it should reopen!\n") + }, 60000) + }) +} + +/* + when setting up the publisher, we register a callback on the `publish_confirm` event that + informs us that the broker has correctly received the sent message. This triggers an update on + the outbox state (the message is considered as sent) +*/ +async function setupPublisher(client) { + const publisherRef = `publisher - ${randomUUID()}` + const publisherConfig = { stream: streamName, publisherRef: publisherRef, connectionClosedListener: (err) => { return } } + /* + confirmedIds contains the list of publishingIds linked to messages correctly published in the stream + These ids are not the messageIds that have been set in the message properties + */ + const publisherConfirmCallback = (err, confirmedIds) => { + if (err) { + logger.info(`Publish confirm error ${inspect(err)} `) + return + } + confirmed = confirmed + confirmedIds.length + confirmedMessageIds = confirmedIds.map((publishingId) => { + const messageId = publisherOutbox.publishingIds.get(publishingId) + publisherOutbox.publishingIds.delete(publishingId) + return messageId + }) - const connectToRabbit = async () => { + publisherOutbox.messageIds = publisherOutbox.messageIds.filter((id) => { + return !confirmedMessageIds.includes(id) + }) + confirmedMessageIds.forEach((id) => { + if (publisherOutbox.messages.delete(id)) { + publisherOutbox.offset = publisherOutbox.offset - 1 + } + }) + } + const publisher = await client.declarePublisher(publisherConfig) + publisher.on("publish_confirm", publisherConfirmCallback) + publisherOutbox.offset = 0 + return publisher +} + +/* + in the consumer we can use the `messageId` property to make sure each message is "handled" once. + */ +async function setupConsumer(client) { + const consumerConfig = { stream: streamName, offset: rabbit.Offset.timestamp(new Date()), connectionClosedListener: (err) => { return } } + const receiveCallback = (msg) => { + const msgId = msg.messageProperties.messageId + if (received.has(msgId)) { + /*On restart, the consumer sets automatically its offset as the latest handled message index. + For sanity, some sort of deduplication is still needed. + @see https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/ + and Consumer.storeOffset and Consumer.queryOffset for a more complete approach + */ + logger.info(`dedup: ${msgId}`) + } + received.add(msgId) + consumerOffset = msg.offset + return + } + return client.declareConsumer(consumerConfig, receiveCallback) +} + + +/* + setup of a client instance, a producer and a consumer. + The core of the example is represented by the implementation of the + `connection_closed` callback, in which the `client.restart()` method is called. + This triggers the reset of all TCP sockets involved, for all producers and consumers, + as well as for the TCP socket used by the client itself. +*/ +async function setup() { + try { + const connectionClosedCallback = () => { + logger.info(`In connection closed event...`) + callbackCalled++ + if (callbackCalled > 10) { + process.exit(0) + } + client.restart().then(() => { + logger.info(`Connections restarted!`) + }).catch((reason) => { + logger.warn(`Could not reconnect to Rabbit! ${reason}`) + }) + } + const firstNode = getNodesFromEnv()[0] + logger.info(`Now invoking rabbit.connect on ${inspect(firstNode)}`) client = await rabbit.connect({ - hostname: "localhost", - port: 5553, + hostname: firstNode.host, + port: firstNode.port, username: rabbitUser, password: rabbitPassword, - listeners: { - connection_closed: async () => { - await sleep(Math.random() * 3000) - connectToRabbit() - .then(() => console.log("Successfully re-connected to rabbit!")) - .catch((e) => console.error("Error while reconnecting to Rabbit!", e)) - }, - }, + listeners: { connection_closed: connectionClosedCallback, }, vhost: "/", heartbeat: 0, - }) + }, logger) + await client.createStream({ stream: streamName, arguments: {} }) + publisher = await setupPublisher(client) + consumer = await setupConsumer(client) + return { client, publisher, consumer } } + catch (err) { + logger.warn(`Setup-wide error: ${inspect(err)}`) + } +} - await connectToRabbit() - - await sleep(2000) - - console.log("Closing!") - await client.close() - console.log("Now it should reopen!") - await sleep(10000) +async function main() { + //instantiate the client, the producer and the consumer + await setup() + //schedule the task that inserts new messages in the outbox + scheduleMessageProduction() + //schedule the task that attempts to send a message to the broker, taking it from the outbox + await scheduleMessageDelivery() + //schedule the task that logs connection info and message counters + scheduleLogInfo() + //schedule the task that triggers a (more or less simulated) network issue + await triggerConnectionIssues() } main() - .then(() => console.log("done!")) + .then(() => logger.info("done!")) .catch((res) => { - console.log("ERROR ", res) + logger.info("ERROR ", res) process.exit(-1) }) -const sleep = (ms) => new Promise((r) => setTimeout(r, ms)) + diff --git a/src/client.ts b/src/client.ts index edf1354d..73da63da 100644 --- a/src/client.ts +++ b/src/client.ts @@ -47,19 +47,26 @@ export type ConnectionClosedListener = (hadError: boolean) => void export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean } +type ConsumerMappedValue = { connection: Connection; consumer: StreamConsumer; params: DeclareConsumerParams } +type PublisherMappedValue = { + connection: Connection + publisher: StreamPublisher + params: DeclarePublisherParams + filter: FilterFunc | undefined +} export class Client { - private id: string = randomUUID() + public readonly id: string = randomUUID() private publisherId = 0 private consumerId = 0 - private consumers = new Map() - private publishers = new Map() + private consumers = new Map() + private publishers = new Map() private compressions = new Map() - private readonly connection: Connection + private connection: Connection - private constructor(private readonly logger: Logger, private readonly params: ClientParams, connection?: Connection) { + private constructor(private readonly logger: Logger, private readonly params: ClientParams) { this.compressions.set(CompressionType.None, NoneCompression.create()) this.compressions.set(CompressionType.Gzip, GzipCompression.create()) - this.connection = connection ?? this.getLocatorConnection() + this.connection = this.getLocatorConnection() this.connection.incrRefCount() } @@ -127,38 +134,24 @@ export class Client { } public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise { - const { stream, publisherRef } = params const publisherId = this.incPublisherId() - const connection = await this.getConnection(params.stream, true, params.connectionClosedListener) - const res = await connection.sendAndWait( - new DeclarePublisherRequest({ stream, publisherRef, publisherId }) - ) - if (!res.ok) { - await connection.close() - throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) - } - if (filter && !connection.isFilteringEnabled) { - throw new Error(`Broker does not support message filtering.`) - } - const publisher = new StreamPublisher( - { - connection: connection, - stream: params.stream, - publisherId: publisherId, - publisherRef: params.publisherRef, - boot: params.boot, - maxFrameSize: this.maxFrameSize, - maxChunkLength: params.maxChunkLength, - logger: this.logger, - }, - filter - ) - this.publishers.set(publisherId, { publisher: publisher, connection: connection }) + await this.declarePublisherOnConnection(params, publisherId, connection, filter) + const streamPublisherParams = { + connection: connection, + stream: params.stream, + publisherId: publisherId, + publisherRef: params.publisherRef, + boot: params.boot, + maxFrameSize: this.maxFrameSize, + maxChunkLength: params.maxChunkLength, + logger: this.logger, + } + const publisher = new StreamPublisher(streamPublisherParams, filter) + this.publishers.set(publisherId, { publisher, connection, params, filter }) this.logger.info( `New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}` ) - return publisher } @@ -176,7 +169,7 @@ export class Client { public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise { const consumerId = this.incConsumerId() - const properties: Record = {} + const connection = await this.getConnection(params.stream, false, params.connectionClosedListener) if (params.filter && !connection.isFilteringEnabled) { @@ -184,41 +177,12 @@ export class Client { } const consumer = new StreamConsumer( - addOffsetFilterToHandle(handle, params.offset), - { - connection, - stream: params.stream, - consumerId, - consumerRef: params.consumerRef, - offset: params.offset, - }, + handle, + { connection, stream: params.stream, consumerId, consumerRef: params.consumerRef, offset: params.offset }, params.filter ) - this.consumers.set(consumerId, consumer) - - if (params.singleActive && !params.consumerRef) { - throw new Error("consumerRef is mandatory when declaring a single active consumer") - } - if (params.singleActive) { - properties["single-active-consumer"] = "true" - properties["name"] = params.consumerRef! - } - if (params.filter) { - for (let i = 0; i < params.filter.values.length; i++) { - properties[`filter.${i}`] = params.filter.values[i] - } - properties["match-unfiltered"] = `${params.filter.matchUnfiltered}` - } - - const res = await this.connection.sendAndWait( - new SubscribeRequest({ ...params, subscriptionId: consumerId, credit: 10, properties: properties }) - ) - - if (!res.ok) { - this.consumers.delete(consumerId) - throw new Error(`Declare Consumer command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) - } - + this.consumers.set(consumerId, { connection, consumer, params }) + await this.declareConsumerOnConnection(params, consumerId, this.connection) this.logger.info( `New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}` ) @@ -226,7 +190,8 @@ export class Client { } public async closeConsumer(consumerId: number) { - const consumer = this.consumers.get(consumerId) + const consumer = this.consumers.get(consumerId)?.consumer + if (!consumer) { this.logger.error("Consumer does not exist") throw new Error(`Consumer with id: ${consumerId} does not exist`) @@ -264,13 +229,13 @@ export class Client { } private async closeAllConsumers(manuallyClose: boolean) { - await Promise.all([...this.consumers.values()].map((c) => c.close(manuallyClose))) - this.consumers = new Map() + await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close(manuallyClose))) + this.consumers = new Map() } private async closeAllPublishers(manuallyClose: boolean) { await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose))) - this.publishers = new Map() + this.publishers = new Map() } public consumerCounts() { @@ -386,6 +351,38 @@ export class Client { return res } + public async restart() { + this.logger.info(`Restarting client connection ${this.connection.id}`) + const uniqueConnectionIds = new Set() + uniqueConnectionIds.add(this.connection.id) + + await new Promise(async (res) => { + setTimeout(() => { + res(true) + }, 5000) + }) + await this.connection.restart() + + for (const { consumer, connection, params } of this.consumers.values()) { + if (!uniqueConnectionIds.has(connection.id)) { + this.logger.info(`Restarting consumer connection ${connection.id}`) + await connection.restart() + } + uniqueConnectionIds.add(connection.id) + const consumerParams = { ...params, offset: consumer.localOffset } + await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, this.connection) + } + + for (const { publisher, connection, params, filter } of this.publishers.values()) { + if (!uniqueConnectionIds.has(connection.id)) { + this.logger.info(`Restarting publisher connection ${connection.id}`) + await connection.restart() + } + uniqueConnectionIds.add(connection.id) + await this.declarePublisherOnConnection(params, publisher.publisherId, connection, filter) + } + } + public get maxFrameSize() { return this.connection.maxFrameSize ?? DEFAULT_FRAME_MAX } @@ -416,6 +413,50 @@ export class Client { return res.streams } + private async declarePublisherOnConnection( + params: DeclarePublisherParams, + publisherId: number, + connection: Connection, + filter?: FilterFunc + ) { + const res = await connection.sendAndWait( + new DeclarePublisherRequest({ stream: params.stream, publisherRef: params.publisherRef, publisherId }) + ) + if (!res.ok) { + await connection.close() + throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) + } + if (filter && !connection.isFilteringEnabled) { + throw new Error(`Broker does not support message filtering.`) + } + } + + private async declareConsumerOnConnection(params: DeclareConsumerParams, consumerId: number, connection: Connection) { + const properties: Record = {} + if (params.singleActive && !params.consumerRef) { + throw new Error("consumerRef is mandatory when declaring a single active consumer") + } + if (params.singleActive) { + properties["single-active-consumer"] = "true" + properties["name"] = params.consumerRef! + } + if (params.filter) { + for (let i = 0; i < params.filter.values.length; i++) { + properties[`filter.${i}`] = params.filter.values[i] + } + properties["match-unfiltered"] = `${params.filter.matchUnfiltered}` + } + + const res = await connection.sendAndWait( + new SubscribeRequest({ ...params, subscriptionId: consumerId, credit: 10, properties: properties }) + ) + + if (!res.ok) { + this.consumers.delete(consumerId) + throw new Error(`Declare Consumer command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) + } + } + private askForCredit(params: CreditRequestParams): Promise { return this.send(new CreditRequest({ ...params })) } @@ -434,7 +475,7 @@ export class Client { private getDeliverV1Callback() { return async (response: DeliverResponse) => { - const consumer = this.consumers.get(response.subscriptionId) + const consumer = this.consumers.get(response.subscriptionId)?.consumer if (!consumer) { this.logger.error(`On deliverV1 no consumer found`) return @@ -448,7 +489,7 @@ export class Client { private getDeliverV2Callback() { return async (response: DeliverResponseV2) => { - const consumer = this.consumers.get(response.subscriptionId) + const consumer = this.consumers.get(response.subscriptionId)?.consumer if (!consumer) { this.logger.error(`On deliverV2 no consumer found`) return @@ -466,7 +507,7 @@ export class Client { private getConsumerUpdateCallback() { return async (response: ConsumerUpdateQuery) => { - const consumer = this.consumers.get(response.subscriptionId) + const consumer = this.consumers.get(response.subscriptionId)?.consumer if (!consumer) { this.logger.error(`On consumer_update_query no consumer found`) return @@ -679,19 +720,6 @@ export function connect(params: ClientParams, logger?: Logger): Promise return Client.connect(params, logger) } -const addOffsetFilterToHandle = (handle: ConsumerFunc, offset: Offset): ConsumerFunc => { - if (offset.type === "numeric") { - const handlerWithFilter = (message: Message) => { - if (message.offset !== undefined && message.offset < offset.value!) { - return - } - handle(message) - } - return handlerWithFilter - } - return handle -} - const chooseNode = (metadata: { leader?: Broker; replicas?: Broker[] }, leader: boolean): Broker | undefined => { if (leader) { return metadata.leader diff --git a/src/connection.ts b/src/connection.ts index ec9eb2f8..0ce3f500 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -57,6 +57,7 @@ export type ConnectionInfo = { host: string port: number id: string + ready: boolean readable?: boolean writable?: boolean localPort?: number @@ -87,19 +88,15 @@ export class Connection { private refs: number = 0 private filteringEnabled: boolean = false public userManuallyClose: boolean = false + private setupCompleted: boolean = false + public readonly id = randomUUID() constructor(private readonly params: ConnectionParams, private readonly logger: Logger) { this.hostname = params.hostname this.leader = params.leader ?? false this.streamName = params.streamName if (params.frameMax) this.frameMax = params.frameMax - if (params.ssl) { - this.socket = tls.connect(params.port, params.hostname, { ...params.ssl, rejectUnauthorized: false }) - } else { - this.socket = new Socket() - this.socket.connect(this.params.port, this.params.hostname) - } - if (params.socketTimeout) this.socket.setTimeout(params.socketTimeout) + this.socket = this.createSocket() this.heartbeat = new Heartbeat(this, this.logger) this.compressions.set(CompressionType.None, NoneCompression.create()) this.compressions.set(CompressionType.Gzip, GzipCompression.create()) @@ -107,23 +104,25 @@ export class Connection { this.bufferSizeSettings = params.bufferSizeSettings || {} this.connectionId = randomUUID() this.connectionClosedListener = params.listeners?.connection_closed + this.logSocket("new") } - public static connect(params: ConnectionParams, logger: Logger): Promise { - return new Connection(params, logger).start() + private createSocket() { + const socket = this.params.ssl + ? tls.connect(this.params.port, this.params.hostname, { + ...this.params.ssl, + rejectUnauthorized: false, + }) + : new Socket().connect(this.params.port, this.params.hostname) + if (this.params.socketTimeout) socket.setTimeout(this.params.socketTimeout) + return socket } - public static create(params: ConnectionParams, logger: Logger): Connection { - return new Connection(params, logger) - } - - public start(): Promise { - this.registerListeners(this.params.listeners) - + private registerSocketListeners(): Promise { return new Promise((res, rej) => { this.socket.on("error", (err) => { this.logger.warn( - `Error on client ${this.params.hostname}:${this.params.port} vhost:${this.params.vhost} err: ${err}` + `Error on connection ${this.id} ${this.params.hostname}:${this.params.port} vhost:${this.params.vhost} err: ${err}` ) return rej(err) }) @@ -134,8 +133,9 @@ export class Connection { await this.auth({ username: this.params.username, password: this.params.password }) const { heartbeat } = await this.tune(this.params.heartbeat ?? 0) await this.open({ virtualHost: this.params.vhost }) - this.heartbeat.start(heartbeat) + if (!this.heartbeat.started) this.heartbeat.start(heartbeat) await this.exchangeCommandVersions() + this.setupCompleted = true return res(this) }) this.socket.on("drain", () => this.logger.warn(`Draining ${this.params.hostname}:${this.params.port}`)) @@ -148,12 +148,41 @@ export class Connection { this.received(data) }) this.socket.on("close", (had_error) => { - this.logger.info(`Close event on socket, close cloud had_error? ${had_error}`) + this.setupCompleted = false + this.logger.info(`Close event on socket for connection ${this.id}, close cloud had_error? ${had_error}`) if (this.connectionClosedListener && !this.userManuallyClose) this.connectionClosedListener(had_error) }) }) } + private unregisterSocketListeners() { + this.socket.removeAllListeners("connect") + this.socket.removeAllListeners("drain") + this.socket.removeAllListeners("timeout") + this.socket.removeAllListeners("data") + this.socket.removeAllListeners("close") + } + + public async restart() { + this.unregisterSocketListeners() + this.socket = this.createSocket() + await this.registerSocketListeners() + this.logSocket("restarted") + } + + public static connect(params: ConnectionParams, logger: Logger): Promise { + return new Connection(params, logger).start() + } + + public static create(params: ConnectionParams, logger: Logger): Connection { + return new Connection(params, logger) + } + + public start(): Promise { + this.registerListeners(this.params.listeners) + return this.registerSocketListeners() + } + public on(event: "metadata_update", listener: MetadataUpdateListener): void public on(event: "publish_confirm", listener: PublishConfirmListener): void public on(event: "publish_error", listener: PublishErrorListener): void @@ -200,6 +229,18 @@ export class Connection { } } + private logSocket(prefix: string = "") { + this.logger.info( + `${prefix} socket for connection ${this.id}: ${inspect([ + this.socket.readable, + this.socket.writable, + this.socket.localAddress, + this.socket.localPort, + this.socket.readyState, + ])}` + ) + } + private registerListeners(listeners?: ConnectionListenersParams) { if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update) if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm) @@ -234,9 +275,7 @@ export class Connection { new ExchangeCommandVersionsRequest(versions) ) this.serverDeclaredVersions.push(...response.serverDeclaredVersions) - - checkServerDeclaredVersions(this.serverVersions, this.logger, this.peerProperties.version) - return response + return checkServerDeclaredVersions(this.serverVersions, this.logger, this.peerProperties.version) } public sendAndWait(cmd: Request): Promise { @@ -287,6 +326,7 @@ export class Connection { readable: this.socket.readable, writable: this.socket.writable, localPort: this.socket.localPort, + ready: this.ready, } } @@ -358,6 +398,10 @@ export class Connection { return this.filteringEnabled } + public get ready() { + return this.setupCompleted + } + private async auth(params: { username: string; password: string }) { this.logger.debug(`Start authentication process ...`) this.logger.debug(`Start SASL handshake ...`) @@ -411,7 +455,7 @@ export class Connection { } public async close(params: ClosingParams = { closingCode: 0, closingReason: "" }): Promise { - this.logger.info(`Closing client...`) + this.logger.info(`Closing connection...`) this.logger.info(`Stopping heartbeat...`) this.heartbeat.stop() this.logger.debug(`Close...`) diff --git a/src/consumer.ts b/src/consumer.ts index d43a4bc1..7974ab38 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -21,9 +21,11 @@ export class StreamConsumer implements Consumer { public consumerId: number public consumerRef?: string public offset: Offset + private clientLocalOffset: Offset + readonly handle: ConsumerFunc constructor( - readonly handle: ConsumerFunc, + handle: ConsumerFunc, params: { connection: Connection stream: string @@ -38,7 +40,9 @@ export class StreamConsumer implements Consumer { this.consumerId = params.consumerId this.consumerRef = params.consumerRef this.offset = params.offset + this.clientLocalOffset = this.offset.clone() this.connection.incrRefCount() + this.handle = this.wrapHandle(handle, params.offset) } async close(manuallyClose: boolean): Promise { @@ -59,7 +63,38 @@ export class StreamConsumer implements Consumer { } public getConnectionInfo(): ConnectionInfo { - const { host, port, id, readable, localPort } = this.connection.getConnectionInfo() - return { host, port, id, readable, localPort } + const { host, port, id, readable, localPort, ready } = this.connection.getConnectionInfo() + return { host, port, id, readable, localPort, ready } + } + + public get localOffset() { + return this.clientLocalOffset.clone() + } + + private wrapHandle(handle: ConsumerFunc, offset: Offset) { + const updateLocalOffsetHandle = this.updateLocalOffsetHandle(handle) + return this.addOffsetFilterToHandle(updateLocalOffsetHandle, offset) + } + + private updateLocalOffsetHandle(handle: ConsumerFunc) { + const wrapped = (message: Message) => { + const result = handle(message) + if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset) + return result + } + return wrapped + } + + private addOffsetFilterToHandle(handle: ConsumerFunc, offset: Offset) { + if (offset.type === "numeric") { + const handlerWithFilter = (message: Message) => { + if (message.offset !== undefined && message.offset < offset.value!) { + return + } + handle(message) + } + return handlerWithFilter + } + return handle } } diff --git a/src/heartbeat.ts b/src/heartbeat.ts index a0a5708c..a2906f3f 100644 --- a/src/heartbeat.ts +++ b/src/heartbeat.ts @@ -32,6 +32,11 @@ export class Heartbeat { clearTimeout(this.timeout) } this.interval = 0 + this.heartBeatStarted = false + } + + public get started() { + return this.heartBeatStarted } reportLastMessageReceived() { diff --git a/src/publisher.ts b/src/publisher.ts index 587f3585..a9672494 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -14,9 +14,7 @@ import { ConnectionPool } from "./connection_pool" import { PublishRequestV2 } from "./requests/publish_request_v2" export type MessageApplicationProperties = Record - export type MessageAnnotations = Record - export type MessageAnnotationsValue = string | number | AmqpByte export class AmqpByte { @@ -75,8 +73,8 @@ export interface MessageOptions { } export interface Publisher { - send(message: Buffer, opts?: MessageOptions): Promise - basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise + send(message: Buffer, opts?: MessageOptions): Promise + basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise flush(): Promise sendSubEntries(messages: Message[], compressionType?: CompressionType): Promise on(event: "metadata_update", listener: MetadataUpdateListener): void @@ -90,6 +88,7 @@ export interface Publisher { export type FilterFunc = (msg: Message) => string | undefined type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void +export type SendResult = { sent: boolean; publishingId: bigint } export class StreamPublisher implements Publisher { private connection: Connection private stream: string @@ -131,23 +130,22 @@ export class StreamPublisher implements Publisher { this.connection.incrRefCount() } - async send(message: Buffer, opts: MessageOptions = {}) { + async send(message: Buffer, opts: MessageOptions = {}): Promise { if (this.boot && this.publishingId === -1n) { this.publishingId = await this.getLastPublishingId() } this.publishingId = this.publishingId + 1n - return this.basicSend(this.publishingId, message, opts) + return await this.basicSend(this.publishingId, message, opts) } - basicSend(publishingId: bigint, content: Buffer, opts: MessageOptions = {}) { + async basicSend(publishingId: bigint, content: Buffer, opts: MessageOptions = {}): Promise { const msg = { publishingId: publishingId, message: { content: content, ...opts } } - return this.enqueue(msg) + return await this.enqueue(msg) } async flush() { - await this.sendBuffer() - return true + return await this.sendBuffer() } async sendSubEntries(messages: Message[], compressionType: CompressionType = CompressionType.None) { @@ -163,8 +161,8 @@ export class StreamPublisher implements Publisher { } public getConnectionInfo(): ConnectionInfo { - const { host, port, id, writable, localPort } = this.connection.getConnectionInfo() - return { host, port, id, writable, localPort } + const { host, port, id, writable, localPort, ready } = this.connection.getConnectionInfo() + return { host, port, id, writable, localPort, ready } } public on(event: "metadata_update", listener: MetadataUpdateListener): void @@ -217,14 +215,12 @@ export class StreamPublisher implements Publisher { } this.checkMessageSize(publishRequestMessage) const sendCycleNeeded = this.add(publishRequestMessage) - let sent = false + const result = { sent: false, publishingId: publishRequestMessage.publishingId } if (sendCycleNeeded) { - await this.sendBuffer() - sent = true + result.sent = await this.sendBuffer() } this.scheduleIfNeeded() - - return sent + return result } private checkMessageSize(publishRequestMessage: PublishRequestMessage) { @@ -237,6 +233,9 @@ export class StreamPublisher implements Publisher { } private async sendBuffer() { + if (!this.connection.ready) { + return false + } const chunk = this.popChunk() if (chunk.length > 0) { this.filter @@ -252,7 +251,9 @@ export class StreamPublisher implements Publisher { messages: chunk, }) ) + return true } + return false } private scheduleIfNeeded() { diff --git a/src/requests/subscribe_request.ts b/src/requests/subscribe_request.ts index 418958e9..2129b618 100755 --- a/src/requests/subscribe_request.ts +++ b/src/requests/subscribe_request.ts @@ -40,6 +40,10 @@ export class Offset { static timestamp(date: Date) { return new Offset("timestamp", BigInt(date.getTime())) } + + public clone() { + return new Offset(this.type, this.value) + } } export class SubscribeRequest extends AbstractRequest { diff --git a/src/super_stream_publisher.ts b/src/super_stream_publisher.ts index 7d83a279..8ff2baa0 100644 --- a/src/super_stream_publisher.ts +++ b/src/super_stream_publisher.ts @@ -1,7 +1,7 @@ import { Client, RoutingStrategy } from "./client" import { CompressionType } from "./compression" import { murmur32 } from "./hash/murmur32" -import { Message, MessageOptions, Publisher } from "./publisher" +import { Message, MessageOptions, Publisher, SendResult } from "./publisher" import { bigIntMax } from "./util" export type MessageKeyExtractorFunction = (content: string, opts: MessageOptions) => string | undefined @@ -47,13 +47,13 @@ export class SuperStreamPublisher { this.publishers = new Map() } - public async send(message: Buffer, opts: MessageOptions): Promise { + public async send(message: Buffer, opts: MessageOptions): Promise { const partition = await this.routeMessage(message, opts) const publisher = await this.getPublisher(partition) return publisher.send(message, opts) } - public async basicSend(publishingId: bigint, message: Buffer, opts: MessageOptions): Promise { + public async basicSend(publishingId: bigint, message: Buffer, opts: MessageOptions): Promise { const partition = await this.routeMessage(message, opts) const publisher = await this.getPublisher(partition) return publisher.basicSend(publishingId, message, opts) diff --git a/test/e2e/client_restart.test.ts b/test/e2e/client_restart.test.ts new file mode 100644 index 00000000..18d68ee0 --- /dev/null +++ b/test/e2e/client_restart.test.ts @@ -0,0 +1,114 @@ +import { expect } from "chai" +import { Client, Offset } from "../../src" +import { createClient, createStreamName } from "../support/fake_data" +import { Rabbit } from "../support/rabbit" +import { eventually, username, password } from "../support/util" +import { Consumer } from "../../src/consumer" +import { Message, Publisher } from "../../src/publisher" +import { randomUUID } from "crypto" + +describe("restart connections", () => { + const rabbit = new Rabbit(username, password) + let streamName: string + let client: Client + + beforeEach(async () => { + client = await createClient(username, password) + streamName = createStreamName() + await rabbit.createStream(streamName) + }) + + afterEach(async () => { + try { + await client.close() + await rabbit.deleteStream(streamName) + await rabbit.closeAllConnections() + await rabbit.deleteAllQueues({ match: /my-stream-/ }) + } catch (e) {} + }) + + it("client only, no producer or consumer, socket connection is reestablished", async () => { + const oldConnectionInfo = client.getConnectionInfo() + + await client.restart() + + await eventually(async () => { + const connectionInfo = client.getConnectionInfo() + expect(connectionInfo.ready).eql(true) + expect(connectionInfo.localPort).not.undefined + expect(oldConnectionInfo.localPort).not.eql(connectionInfo.localPort) + }) + }).timeout(10000) + + it("client with publishers and consumers, socket connections are reestablished", async () => { + const clientOldConnectionInfo = client.getConnectionInfo() + const streamNames = [streamName, createStreamName(), createStreamName()] + const publishers = new Map() + const localPublisherPorts = new Map() + const localConsumerPorts = new Map() + const consumers = new Map() + const dummyMsgHandler = (_msg: Message) => { + return + } + for (const stream of streamNames) { + await rabbit.createStream(stream) + const publisher = await client.declarePublisher({ stream: stream }) + const consumer1 = await client.declareConsumer({ stream: stream, offset: Offset.first() }, dummyMsgHandler) + const consumer2 = await client.declareConsumer({ stream: stream, offset: Offset.first() }, dummyMsgHandler) + publishers.set(publisher.publisherId, publisher) + consumers.set(consumer1.consumerId, consumer1) + consumers.set(consumer2.consumerId, consumer2) + localPublisherPorts.set(publisher.publisherId, publisher.getConnectionInfo().localPort!) + localConsumerPorts.set(consumer1.consumerId, consumer1.getConnectionInfo().localPort!) + localConsumerPorts.set(consumer2.consumerId, consumer2.getConnectionInfo().localPort!) + } + + await client.restart() + + await eventually(async () => { + const clientConnectionInfo = client.getConnectionInfo() + expect(clientConnectionInfo.localPort).is.not.undefined + expect(clientConnectionInfo.localPort).not.eql(clientOldConnectionInfo.localPort) + expect(clientConnectionInfo.ready).eql(true) + for (const consumerId of consumers.keys()) { + const consumer = consumers.get(consumerId) + const consumerConnectionInfo = consumer!.getConnectionInfo() + expect(consumerConnectionInfo.ready).eql(true) + expect(consumerConnectionInfo.localPort).not.undefined + expect(consumerConnectionInfo.localPort).not.eql(localConsumerPorts.get(consumerId)) + } + for (const publisherId of publishers.keys()) { + const publisher = publishers.get(publisherId) + const publisherConnectionInfo = publisher!.getConnectionInfo() + expect(publisherConnectionInfo.ready).eql(true) + expect(publisherConnectionInfo.localPort).not.undefined + expect(publisherConnectionInfo.localPort).not.eql(localPublisherPorts.get(publisherId)) + } + }, 10000) + }).timeout(20000) + + it("sending and receiving messages is not affected", async () => { + const received = new Set() + const messageNumber = 10000 + const triggerIndex = Math.floor(messageNumber / 4) + const consumeHandle = (msg: Message) => { + received.add(msg.messageProperties?.messageId!) + } + const oldClientConnectionInfo = client.getConnectionInfo() + await client.declareConsumer({ stream: streamName, offset: Offset.first() }, consumeHandle) + const publisher = await client.declarePublisher({ stream: streamName }) + + for (let i = 0; i < messageNumber; i++) { + const msg = Buffer.from(`${randomUUID()}`) + await publisher.send(msg, { messageProperties: { messageId: `${i}` } }) + if (i === triggerIndex) await client.restart() + } + + await eventually(async () => { + const clientConnectionInfo = client.getConnectionInfo() + expect(clientConnectionInfo.ready).eql(true) + expect(clientConnectionInfo.localPort).not.eql(oldClientConnectionInfo.localPort) + expect(received.size).eql(messageNumber) + }, 10000) + }).timeout(20000) +}) diff --git a/test/e2e/connection_closed_listener.test.ts b/test/e2e/connection_closed_listener.test.ts index 7769b289..71baa5ec 100644 --- a/test/e2e/connection_closed_listener.test.ts +++ b/test/e2e/connection_closed_listener.test.ts @@ -159,11 +159,7 @@ describe("connection closed callback", () => { await always(() => { expect(clientListenerSpy).to.have.been.called.exactly(0) - }) - await always(() => { expect(publisherListenerSpy).to.have.been.called.exactly(0) - }) - await always(() => { expect(consumerListenerSpy).to.have.been.called.exactly(0) }) }).timeout(5000) @@ -192,11 +188,44 @@ describe("connection closed callback", () => { await rabbit.closeAllConnections() await eventually(() => { - expect(clientListenerSpy).to.have.been.called.exactly(1) - expect(publisherListenerSpy).to.have.been.called.exactly(1) - expect(consumerListenerSpy).to.have.been.called.exactly(1) + expect(clientListenerSpy).to.have.been.called.at.least(1) + expect(publisherListenerSpy).to.have.been.called.at.least(1) + expect(consumerListenerSpy).to.have.been.called.at.least(1) + }, 5000) + await always(() => { + expect(clientListenerSpy).to.have.been.called.at.most(1) + expect(publisherListenerSpy).to.have.been.called.at.most(1) + expect(consumerListenerSpy).to.have.been.called.at.most(1) + }, 5000) + }).timeout(15000) + + it("closed_connection listener is not invoked on publishers and consumers if not explicitly set", async () => { + let ctr = 0 + const listener = (_hasError: boolean) => { + ctr++ + if (ctr > 1) throw new Error("") + return + } + const clientListenerSpy = spy(listener) + client = await createClient(username, password, { connection_closed: clientListenerSpy }) + await client.declarePublisher({ + stream: streamName, + publisherRef, + }) + await client.declareConsumer({ stream: streamName, consumerRef, offset: Offset.first() }, (_msg) => { + return + }) + await sleep(5000) + + await rabbit.closeAllConnections() + + await eventually(() => { + expect(clientListenerSpy).to.have.been.called.at.least(1) + }, 5000) + await always(() => { + expect(clientListenerSpy).to.have.been.called.at.most(1) }, 5000) - }).timeout(10000) + }).timeout(15000) }) const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)) diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index 4f996851..4d6c3dbb 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -92,7 +92,7 @@ describe("Publisher", () => { const result = await publisher.send(msg, {}) - expect(result).is.false + expect(result.sent).is.false }) it("if max queue length is reached, then the chunk is sent immediately", async () => { const queueLength = 2 @@ -105,7 +105,8 @@ describe("Publisher", () => { const result = await Promise.all(msgs.map((msg) => publisher.send(msg, {}))) - expect(result).eql([false, true]) + expect(result[0].sent).is.false + expect(result[1].sent).is.true }) it("even if max queue length is not reached, the messages are eventually sent", async () => {