diff --git a/README.md b/README.md index a069aa6..ad90685 100644 --- a/README.md +++ b/README.md @@ -245,6 +245,25 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message // ... ``` +Optionally a single active consumer can have a callback which returns an offset that will be used once the consumer becomes active + +```typescript +const consumerOptions = { + stream: "stream-name", + offset: Offset.next(), + singleActive: true, + consumerRef: "my-consumer-ref", + consumerUpdateListener: async (consumerReference, streamName) => { + const offset = await client.queryOffset({ reference: consumerReference, stream: streamName }) + return rabbit.Offset.offset(offset) + }, +} + +// ... +``` + +Check `example/single_active_consumer_update_example.js` for a basic usage example. + ### Custom Policy By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing. diff --git a/example/package-lock.json b/example/package-lock.json index 3ff1abc..2d55046 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -23,7 +23,7 @@ "extraneous": true }, "..": { - "version": "0.5.1", + "version": "0.6.0", "license": "ISC", "dependencies": { "semver": "^7.5.4" @@ -38,7 +38,7 @@ "@types/node": "^20.11.5", "@typescript-eslint/eslint-plugin": "^6.19.0", "@typescript-eslint/parser": "^6.19.0", - "amqplib": "^0.10.3", + "amqplib": "^0.10.5", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", "chai-spies": "^1.1.0", @@ -254,7 +254,7 @@ "@types/node": "^20.11.5", "@typescript-eslint/eslint-plugin": "^6.19.0", "@typescript-eslint/parser": "^6.19.0", - "amqplib": "^0.10.3", + "amqplib": "^0.10.5", "chai": "^4.3.7", "chai-as-promised": "^7.1.1", "chai-spies": "^1.1.0", diff --git a/example/src/single_active_consumer_update_example.js b/example/src/single_active_consumer_update_example.js new file mode 100644 index 0000000..06df5d1 --- /dev/null +++ b/example/src/single_active_consumer_update_example.js @@ -0,0 +1,83 @@ +const rabbit = require("rabbitmq-stream-js-client") +const crypto = require("crypto") + +const wait = (ms) => new Promise((r) => setTimeout(r, ms)) + +async function main() { + const messagesFromFirstConsumer = [] + const messagesFromSecondConsumer = [] + + console.log("Connecting...") + const client = await rabbit.connect({ + vhost: "/", + port: 5552, + hostname: "localhost", + username: "rabbit", + password: "rabbit", + }) + + console.log("Making sure the stream exists...") + const streamName = "active-consumer-switch-on-single-active-consumer" + await client.createStream({ stream: streamName, arguments: {} }) + const consumerRef = `my-consumer-${crypto.randomUUID()}` + + console.log("Creating the publisher and sending 100 messages...") + const publisher = await client.declarePublisher({ stream: streamName }) + for (let i = 1; i <= 100; i++) { + await publisher.send(Buffer.from(`${i}`)) + } + + console.log("Creating the first consumer, when 50 messages are consumed it saves the offset on the server...") + const consumer1 = await client.declareConsumer( + { + stream: streamName, + offset: rabbit.Offset.first(), + singleActive: true, + consumerRef: consumerRef, + }, + async (message) => { + messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`) + if (messagesFromFirstConsumer.length === 50) { + await consumer1.storeOffset(message.offset) + } + } + ) + + await wait(500) + + console.log("Creating the second consumer, when it becomes active it resumes from the stored offset on the server...") + await client.declareConsumer( + { + stream: streamName, + offset: rabbit.Offset.first(), + singleActive: true, + consumerRef: consumerRef, + // This callback is executed when the consumer becomes active + consumerUpdateListener: async (consumerReference, streamName) => { + const offset = await client.queryOffset({ reference: consumerReference, stream: streamName }) + return rabbit.Offset.offset(offset) + }, + }, + (message) => { + messagesFromSecondConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`) + } + ) + + console.log("Closing the first consumer to trigger the activation of the second one...") + await client.closeConsumer(consumer1.extendedId) + + await wait(500) + + console.log(`Messages consumed by the first consumer: ${messagesFromFirstConsumer.length}`) + console.log(`Messages consumed by the second consumer: ${messagesFromSecondConsumer.length}`) +} + +main() + .then(() => { + console.log("done!") + process.exit(0) + }) + .catch((res) => { + console.log("Error in publishing message!", res) + process.exit(-1) + }) diff --git a/src/client.ts b/src/client.ts index 936d9f6..84e5252 100644 --- a/src/client.ts +++ b/src/client.ts @@ -4,7 +4,7 @@ import { inspect } from "util" import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression" import { Connection, ConnectionInfo, ConnectionParams, errorMessageOf } from "./connection" import { ConnectionPool, ConnectionPurpose } from "./connection_pool" -import { Consumer, ConsumerFunc, StreamConsumer, computeExtendedConsumerId } from "./consumer" +import { Consumer, ConsumerFunc, ConsumerUpdateListener, StreamConsumer, computeExtendedConsumerId } from "./consumer" import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes" import { Logger, NullLogger } from "./logger" import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher" @@ -212,6 +212,8 @@ export class Client { consumerTag: params.consumerTag, offset: params.offset, creditPolicy: params.creditPolicy, + singleActive: params.singleActive, + consumerUpdateListener: params.consumerUpdateListener, }, params.filter ) @@ -589,13 +591,31 @@ export class Client { this.logger.error(`On consumer_update_query no consumer found`) return } + const offset = await this.getConsumerOrServerSavedOffset(consumer) + consumer.updateConsumerOffset(offset) this.logger.debug(`on consumer_update_query -> ${consumer.consumerRef}`) await connection.send( - new ConsumerUpdateResponse({ correlationId: response.correlationId, responseCode: 1, offset: consumer.offset }) + new ConsumerUpdateResponse({ correlationId: response.correlationId, responseCode: 1, offset }) ) } } + private async getConsumerOrServerSavedOffset(consumer: StreamConsumer) { + if (consumer.isSingleActive && consumer.consumerRef && consumer.consumerUpdateListener) { + try { + const offset = await consumer.consumerUpdateListener(consumer.consumerRef, consumer.streamName) + return offset + } catch (error) { + this.logger.error( + `Error in consumerUpdateListener for consumerRef ${consumer.consumerRef}: ${(error as Error).message}` + ) + return consumer.offset + } + } + + return consumer.offset + } + private getLocatorConnection() { const connectionParams = this.buildConnectionParams(false, "", this.params.listeners?.connection_closed) return Connection.create(connectionParams, this.logger) @@ -790,6 +810,7 @@ export interface DeclareConsumerParams { consumerRef?: string offset: Offset connectionClosedListener?: ConnectionClosedListener + consumerUpdateListener?: ConsumerUpdateListener singleActive?: boolean filter?: ConsumerFilter creditPolicy?: ConsumerCreditPolicy diff --git a/src/consumer.ts b/src/consumer.ts index d410e19..6bbfd16 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -6,6 +6,7 @@ import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" export type ConsumerFunc = (message: Message) => Promise | void +export type ConsumerUpdateListener = (consumerRef: string, streamName: string) => Promise export const computeExtendedConsumerId = (consumerId: number, connectionId: string) => { return `${consumerId}@${connectionId}` } @@ -40,6 +41,13 @@ export interface Consumer { */ getConnectionInfo(): ConnectionInfo + /** + * Updates the offset of the consumer instance + * + * @param {Offset} offset - The new offset to set + */ + updateConsumerOffset(offset: Offset): void + consumerId: number consumerRef?: string readonly extendedId: string @@ -52,10 +60,12 @@ export class StreamConsumer implements Consumer { public consumerRef?: string public consumerTag?: string public offset: Offset + public consumerUpdateListener?: ConsumerUpdateListener private clientLocalOffset: Offset private creditsHandler: ConsumerCreditPolicy private consumerHandle: ConsumerFunc private closed: boolean + private singleActive: boolean = false constructor( handle: ConsumerFunc, @@ -67,6 +77,8 @@ export class StreamConsumer implements Consumer { consumerTag?: string offset: Offset creditPolicy?: ConsumerCreditPolicy + singleActive?: boolean + consumerUpdateListener?: ConsumerUpdateListener }, readonly filter?: ConsumerFilter ) { @@ -79,7 +91,9 @@ export class StreamConsumer implements Consumer { this.connection.incrRefCount() this.creditsHandler = params.creditPolicy || defaultCreditPolicy this.consumerHandle = handle + this.consumerUpdateListener = params.consumerUpdateListener this.closed = false + this.singleActive = params.singleActive ?? false } async close(manuallyClose: boolean): Promise { @@ -127,6 +141,15 @@ export class StreamConsumer implements Consumer { return this.creditsHandler } + public get isSingleActive() { + return this.singleActive + } + + public updateConsumerOffset(offset: Offset) { + this.offset = offset.clone() + this.clientLocalOffset = offset.clone() + } + private maybeUpdateLocalOffset(message: Message) { if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset) } diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 8640d8d..95bd210 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -30,6 +30,7 @@ import { getTestNodesFromEnv, password, username, + wait, waitSleeping, } from "../support/util" import { Connection, Channel } from "amqplib" @@ -216,6 +217,57 @@ describe("declare consumer", () => { await eventually(() => expect(messages).eql([Buffer.from("hello"), Buffer.from("hello")])) }).timeout(10000) + it("declaring two single active consumer on an existing stream - after closing one consumer the active one can resume the consuming from the last saved offset on the server", async () => { + const messagesFromFirstConsumer: string[] = [] + const messagesFromSecondConsumer: string[] = [] + const consumerRef = createConsumerRef() + for (let i = 1; i <= 100; i++) { + await publisher.send(Buffer.from(`${i}`)) + } + + const consumer1 = await client.declareConsumer( + { + stream: streamName, + offset: Offset.first(), + singleActive: true, + consumerRef: consumerRef, + consumerUpdateListener: async (cr: string, sn: string) => { + const offset = await client.queryOffset({ reference: cr, stream: sn }) + return Offset.offset(offset) + }, + }, + async (message: Message) => { + messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`) + if (messagesFromFirstConsumer.length === 50) { + await consumer1.storeOffset(message.offset!) + } + } + ) + await wait(500) + await client.declareConsumer( + { + stream: streamName, + offset: Offset.first(), + singleActive: true, + consumerRef: consumerRef, + consumerUpdateListener: async (cr: string, sn: string) => { + const offset = await client.queryOffset({ reference: cr, stream: sn }) + return Offset.offset(offset) + }, + }, + (message: Message) => { + messagesFromSecondConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`) + } + ) + await client.closeConsumer(consumer1.extendedId) + await wait(500) + + await eventually(() => { + expect(messagesFromSecondConsumer.find((m) => m === `Message 50 from ${consumerRef}`)).to.not.be.undefined + expect(messagesFromSecondConsumer.find((m) => m === `Message 49 from ${consumerRef}`)).to.be.undefined + }, 8000) + }).timeout(20000) + it("declaring a single active consumer without reference on an existing stream - should throw an error", async () => { const messages: Buffer[] = []