Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,25 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message
// ...
```

Optionally a single active consumer can have a callback which returns an offset that will be used once the consumer becomes active

```typescript
const consumerOptions = {
stream: "stream-name",
offset: Offset.next(),
singleActive: true,
consumerRef: "my-consumer-ref",
consumerUpdateListener: async (consumerReference, streamName) => {
const offset = await client.queryOffset({ reference: consumerReference, stream: streamName })
return rabbit.Offset.offset(offset)
},
}

// ...
```

Check `example/single_active_consumer_update_example.js` for a basic usage example.

### Custom Policy

By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing.
Expand Down
6 changes: 3 additions & 3 deletions example/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 83 additions & 0 deletions example/src/single_active_consumer_update_example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
const rabbit = require("rabbitmq-stream-js-client")
const crypto = require("crypto")

const wait = (ms) => new Promise((r) => setTimeout(r, ms))

async function main() {
const messagesFromFirstConsumer = []
const messagesFromSecondConsumer = []

console.log("Connecting...")
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "localhost",
username: "rabbit",
password: "rabbit",
})

console.log("Making sure the stream exists...")
const streamName = "active-consumer-switch-on-single-active-consumer"
await client.createStream({ stream: streamName, arguments: {} })
const consumerRef = `my-consumer-${crypto.randomUUID()}`

console.log("Creating the publisher and sending 100 messages...")
const publisher = await client.declarePublisher({ stream: streamName })
for (let i = 1; i <= 100; i++) {
await publisher.send(Buffer.from(`${i}`))
}

console.log("Creating the first consumer, when 50 messages are consumed it saves the offset on the server...")
const consumer1 = await client.declareConsumer(
{
stream: streamName,
offset: rabbit.Offset.first(),
singleActive: true,
consumerRef: consumerRef,
},
async (message) => {
messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
if (messagesFromFirstConsumer.length === 50) {
await consumer1.storeOffset(message.offset)
}
}
)

await wait(500)

console.log("Creating the second consumer, when it becomes active it resumes from the stored offset on the server...")
await client.declareConsumer(
{
stream: streamName,
offset: rabbit.Offset.first(),
singleActive: true,
consumerRef: consumerRef,
// This callback is executed when the consumer becomes active
consumerUpdateListener: async (consumerReference, streamName) => {
const offset = await client.queryOffset({ reference: consumerReference, stream: streamName })
return rabbit.Offset.offset(offset)
},
},
(message) => {
messagesFromSecondConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
}
)

console.log("Closing the first consumer to trigger the activation of the second one...")
await client.closeConsumer(consumer1.extendedId)

await wait(500)

console.log(`Messages consumed by the first consumer: ${messagesFromFirstConsumer.length}`)
console.log(`Messages consumed by the second consumer: ${messagesFromSecondConsumer.length}`)
}

main()
.then(() => {
console.log("done!")
process.exit(0)
})
.catch((res) => {
console.log("Error in publishing message!", res)
process.exit(-1)
})
25 changes: 23 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { inspect } from "util"
import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression"
import { Connection, ConnectionInfo, ConnectionParams, errorMessageOf } from "./connection"
import { ConnectionPool, ConnectionPurpose } from "./connection_pool"
import { Consumer, ConsumerFunc, StreamConsumer, computeExtendedConsumerId } from "./consumer"
import { Consumer, ConsumerFunc, ConsumerUpdateListener, StreamConsumer, computeExtendedConsumerId } from "./consumer"
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
import { Logger, NullLogger } from "./logger"
import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
Expand Down Expand Up @@ -212,6 +212,8 @@ export class Client {
consumerTag: params.consumerTag,
offset: params.offset,
creditPolicy: params.creditPolicy,
singleActive: params.singleActive,
consumerUpdateListener: params.consumerUpdateListener,
},
params.filter
)
Expand Down Expand Up @@ -589,13 +591,31 @@ export class Client {
this.logger.error(`On consumer_update_query no consumer found`)
return
}
const offset = await this.getConsumerOrServerSavedOffset(consumer)
consumer.updateConsumerOffset(offset)
this.logger.debug(`on consumer_update_query -> ${consumer.consumerRef}`)
await connection.send(
new ConsumerUpdateResponse({ correlationId: response.correlationId, responseCode: 1, offset: consumer.offset })
new ConsumerUpdateResponse({ correlationId: response.correlationId, responseCode: 1, offset })
)
}
}

private async getConsumerOrServerSavedOffset(consumer: StreamConsumer) {
if (consumer.isSingleActive && consumer.consumerRef && consumer.consumerUpdateListener) {
try {
const offset = await consumer.consumerUpdateListener(consumer.consumerRef, consumer.streamName)
return offset
} catch (error) {
this.logger.error(
`Error in consumerUpdateListener for consumerRef ${consumer.consumerRef}: ${(error as Error).message}`
)
return consumer.offset
}
}

return consumer.offset
}

private getLocatorConnection() {
const connectionParams = this.buildConnectionParams(false, "", this.params.listeners?.connection_closed)
return Connection.create(connectionParams, this.logger)
Expand Down Expand Up @@ -790,6 +810,7 @@ export interface DeclareConsumerParams {
consumerRef?: string
offset: Offset
connectionClosedListener?: ConnectionClosedListener
consumerUpdateListener?: ConsumerUpdateListener
singleActive?: boolean
filter?: ConsumerFilter
creditPolicy?: ConsumerCreditPolicy
Expand Down
23 changes: 23 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Message } from "./publisher"
import { Offset } from "./requests/subscribe_request"

export type ConsumerFunc = (message: Message) => Promise<void> | void
export type ConsumerUpdateListener = (consumerRef: string, streamName: string) => Promise<Offset>
export const computeExtendedConsumerId = (consumerId: number, connectionId: string) => {
return `${consumerId}@${connectionId}`
}
Expand Down Expand Up @@ -40,6 +41,13 @@ export interface Consumer {
*/
getConnectionInfo(): ConnectionInfo

/**
* Updates the offset of the consumer instance
*
* @param {Offset} offset - The new offset to set
*/
updateConsumerOffset(offset: Offset): void

consumerId: number
consumerRef?: string
readonly extendedId: string
Expand All @@ -52,10 +60,12 @@ export class StreamConsumer implements Consumer {
public consumerRef?: string
public consumerTag?: string
public offset: Offset
public consumerUpdateListener?: ConsumerUpdateListener
private clientLocalOffset: Offset
private creditsHandler: ConsumerCreditPolicy
private consumerHandle: ConsumerFunc
private closed: boolean
private singleActive: boolean = false

constructor(
handle: ConsumerFunc,
Expand All @@ -67,6 +77,8 @@ export class StreamConsumer implements Consumer {
consumerTag?: string
offset: Offset
creditPolicy?: ConsumerCreditPolicy
singleActive?: boolean
consumerUpdateListener?: ConsumerUpdateListener
},
readonly filter?: ConsumerFilter
) {
Expand All @@ -79,7 +91,9 @@ export class StreamConsumer implements Consumer {
this.connection.incrRefCount()
this.creditsHandler = params.creditPolicy || defaultCreditPolicy
this.consumerHandle = handle
this.consumerUpdateListener = params.consumerUpdateListener
this.closed = false
this.singleActive = params.singleActive ?? false
}

async close(manuallyClose: boolean): Promise<void> {
Expand Down Expand Up @@ -127,6 +141,15 @@ export class StreamConsumer implements Consumer {
return this.creditsHandler
}

public get isSingleActive() {
return this.singleActive
}

public updateConsumerOffset(offset: Offset) {
this.offset = offset.clone()
this.clientLocalOffset = offset.clone()
}

private maybeUpdateLocalOffset(message: Message) {
if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset)
}
Expand Down
52 changes: 52 additions & 0 deletions test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
getTestNodesFromEnv,
password,
username,
wait,
waitSleeping,
} from "../support/util"
import { Connection, Channel } from "amqplib"
Expand Down Expand Up @@ -216,6 +217,57 @@ describe("declare consumer", () => {
await eventually(() => expect(messages).eql([Buffer.from("hello"), Buffer.from("hello")]))
}).timeout(10000)

it("declaring two single active consumer on an existing stream - after closing one consumer the active one can resume the consuming from the last saved offset on the server", async () => {
const messagesFromFirstConsumer: string[] = []
const messagesFromSecondConsumer: string[] = []
const consumerRef = createConsumerRef()
for (let i = 1; i <= 100; i++) {
await publisher.send(Buffer.from(`${i}`))
}

const consumer1 = await client.declareConsumer(
{
stream: streamName,
offset: Offset.first(),
singleActive: true,
consumerRef: consumerRef,
consumerUpdateListener: async (cr: string, sn: string) => {
const offset = await client.queryOffset({ reference: cr, stream: sn })
return Offset.offset(offset)
},
},
async (message: Message) => {
messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
if (messagesFromFirstConsumer.length === 50) {
await consumer1.storeOffset(message.offset!)
}
}
)
await wait(500)
await client.declareConsumer(
{
stream: streamName,
offset: Offset.first(),
singleActive: true,
consumerRef: consumerRef,
consumerUpdateListener: async (cr: string, sn: string) => {
const offset = await client.queryOffset({ reference: cr, stream: sn })
return Offset.offset(offset)
},
},
(message: Message) => {
messagesFromSecondConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
}
)
await client.closeConsumer(consumer1.extendedId)
await wait(500)

await eventually(() => {
expect(messagesFromSecondConsumer.find((m) => m === `Message 50 from ${consumerRef}`)).to.not.be.undefined
expect(messagesFromSecondConsumer.find((m) => m === `Message 49 from ${consumerRef}`)).to.be.undefined
}, 8000)
}).timeout(20000)

it("declaring a single active consumer without reference on an existing stream - should throw an error", async () => {
const messages: Buffer[] = []

Expand Down