diff --git a/src/client.ts b/src/client.ts index 0b1f850f..52cc80a0 100644 --- a/src/client.ts +++ b/src/client.ts @@ -2,9 +2,9 @@ import { randomUUID } from "crypto" import { coerce, lt } from "semver" import { inspect } from "util" import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression" -import { Connection, ConnectionInfo, errorMessageOf } from "./connection" -import { ConnectionPool } from "./connection_pool" -import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer" +import { Connection, ConnectionInfo, ConnectionParams, errorMessageOf } from "./connection" +import { ConnectionPool, ConnectionPurpose } from "./connection_pool" +import { Consumer, ConsumerFunc, StreamConsumer, computeExtendedConsumerId } from "./consumer" import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes" import { Logger, NullLogger } from "./logger" import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher" @@ -56,10 +56,8 @@ type PublisherMappedValue = { } export class Client { public readonly id: string = randomUUID() - private publisherId = 0 - private consumerId = 0 - private consumers = new Map() - private publishers = new Map() + private consumers = new Map() + private publishers = new Map() private compressions = new Map() private connection: Connection @@ -134,8 +132,8 @@ export class Client { } public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise { - const publisherId = this.incPublisherId() - const connection = await this.getConnection(params.stream, true, params.connectionClosedListener) + const connection = await this.getConnection(params.stream, "publisher", params.connectionClosedListener) + const publisherId = connection.getNextPublisherId() await this.declarePublisherOnConnection(params, publisherId, connection, filter) const streamPublisherParams = { connection: connection, @@ -151,32 +149,35 @@ export class Client { connection.on("metadata_update", async (metadata) => { if (metadata.metadataInfo.stream === publisher.streamName) { await publisher.close(false) - this.publishers.delete(publisherId) + this.publishers.delete(publisher.extendedId) } }) - this.publishers.set(publisherId, { publisher, connection, params, filter }) + this.publishers.set(publisher.extendedId, { publisher, connection, params, filter }) this.logger.info( `New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}` ) return publisher } - public async deletePublisher(publisherId: number) { - const publisherConnection = this.publishers.get(publisherId)?.connection ?? this.connection - const res = await publisherConnection.sendAndWait(new DeletePublisherRequest(publisherId)) + public async deletePublisher(extendedPublisherId: string) { + const { publisher, connection } = this.publishers.get(extendedPublisherId) ?? { + publisher: undefined, + connection: this.connection, + } + const publisherId = extractPublisherId(extendedPublisherId) + const res = await connection.sendAndWait(new DeletePublisherRequest(publisherId)) if (!res.ok) { throw new Error(`Delete Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } - await this.publishers.get(publisherId)?.publisher.close(true) - this.publishers.delete(publisherId) + await publisher?.close(true) + this.publishers.delete(extendedPublisherId) this.logger.info(`deleted publisher with publishing id ${publisherId}`) return res.ok } public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise { - const consumerId = this.incConsumerId() - - const connection = await this.getConnection(params.stream, false, params.connectionClosedListener) + const connection = await this.getConnection(params.stream, "consumer", params.connectionClosedListener) + const consumerId = connection.getNextConsumerId() if (params.filter && !connection.isFilteringEnabled) { throw new Error(`Broker does not support message filtering.`) @@ -192,10 +193,10 @@ export class Client { if (params.connectionClosedListener) { params.connectionClosedListener(false) } - await this.closeConsumer(consumerId) + await this.closeConsumer(consumer.extendedId) } }) - this.consumers.set(consumerId, { connection, consumer, params }) + this.consumers.set(consumer.extendedId, { connection, consumer, params }) await this.declareConsumerOnConnection(params, consumerId, connection) this.logger.info( `New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}` @@ -203,20 +204,24 @@ export class Client { return consumer } - public async closeConsumer(consumerId: number) { - const { consumer, connection } = this.consumers.get(consumerId) ?? { consumer: undefined, connection: undefined } + public async closeConsumer(extendedConsumerId: string) { + const { consumer, connection } = this.consumers.get(extendedConsumerId) ?? { + consumer: undefined, + connection: undefined, + } + const consumerId = extractConsumerId(extendedConsumerId) if (!consumer) { this.logger.error("Consumer does not exist") - throw new Error(`Consumer with id: ${consumerId} does not exist`) + throw new Error(`Consumer with id: ${extendedConsumerId} does not exist`) } const res = await connection.sendAndWait(new UnsubscribeRequest(consumerId)) await consumer.close(true) - this.consumers.delete(consumerId) + this.consumers.delete(extendedConsumerId) if (!res.ok) { throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } - this.logger.info(`Closed consumer with id: ${consumerId}`) + this.logger.info(`Closed consumer with id: ${extendedConsumerId}`) return res.ok } @@ -248,12 +253,12 @@ export class Client { private async closeAllConsumers(manuallyClose: boolean) { await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close(manuallyClose))) - this.consumers = new Map() + this.consumers = new Map() } private async closeAllPublishers(manuallyClose: boolean) { await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose))) - this.publishers = new Map() + this.publishers = new Map() } public consumerCounts() { @@ -366,9 +371,9 @@ export class Client { } public async restart() { - this.logger.info(`Restarting client connection ${this.connection.id}`) + this.logger.info(`Restarting client connection ${this.connection.connectionId}`) const uniqueConnectionIds = new Set() - uniqueConnectionIds.add(this.connection.id) + uniqueConnectionIds.add(this.connection.connectionId) await new Promise(async (res) => { setTimeout(() => { @@ -378,21 +383,21 @@ export class Client { await this.connection.restart() for (const { consumer, connection, params } of this.consumers.values()) { - if (!uniqueConnectionIds.has(connection.id)) { - this.logger.info(`Restarting consumer connection ${connection.id}`) + if (!uniqueConnectionIds.has(connection.connectionId)) { + this.logger.info(`Restarting consumer connection ${connection.connectionId}`) await connection.restart() } - uniqueConnectionIds.add(connection.id) + uniqueConnectionIds.add(connection.connectionId) const consumerParams = { ...params, offset: consumer.localOffset } await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, connection) } for (const { publisher, connection, params, filter } of this.publishers.values()) { - if (!uniqueConnectionIds.has(connection.id)) { - this.logger.info(`Restarting publisher connection ${connection.id}`) + if (!uniqueConnectionIds.has(connection.connectionId)) { + this.logger.info(`Restarting publisher connection ${connection.connectionId}`) await connection.restart() } - uniqueConnectionIds.add(connection.id) + uniqueConnectionIds.add(connection.connectionId) await this.declarePublisherOnConnection(params, publisher.publisherId, connection, filter) } } @@ -466,7 +471,7 @@ export class Client { ) if (!res.ok) { - this.consumers.delete(consumerId) + this.consumers.delete(computeExtendedConsumerId(consumerId, connection.connectionId)) throw new Error(`Declare Consumer command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } } @@ -475,21 +480,11 @@ export class Client { return connection.send(new CreditRequest({ ...params })) } - private incPublisherId() { - const publisherId = this.publisherId - this.publisherId++ - return publisherId - } - - private incConsumerId() { - const consumerId = this.consumerId - this.consumerId++ - return consumerId - } - - private getDeliverV1Callback() { + private getDeliverV1Callback(connectionId: string) { return async (response: DeliverResponse) => { - const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? { + const { consumer, connection } = this.consumers.get( + computeExtendedConsumerId(response.subscriptionId, connectionId) + ) ?? { consumer: undefined, connection: undefined, } @@ -504,9 +499,11 @@ export class Client { } } - private getDeliverV2Callback() { + private getDeliverV2Callback(connectionId: string) { return async (response: DeliverResponseV2) => { - const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? { + const { consumer, connection } = this.consumers.get( + computeExtendedConsumerId(response.subscriptionId, connectionId) + ) ?? { consumer: undefined, connection: undefined, } @@ -525,9 +522,11 @@ export class Client { } } - private getConsumerUpdateCallback() { + private getConsumerUpdateCallback(connectionId: string) { return async (response: ConsumerUpdateQuery) => { - const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? { + const { consumer, connection } = this.consumers.get( + computeExtendedConsumerId(response.subscriptionId, connectionId) + ) ?? { consumer: undefined, connection: undefined, } @@ -549,26 +548,26 @@ export class Client { private async getConnection( streamName: string, - leader: boolean, + purpose: ConnectionPurpose, connectionClosedListener?: ConnectionClosedListener ): Promise { const [metadata] = await this.queryMetadata({ streams: [streamName] }) - const chosenNode = chooseNode(metadata, leader) + const chosenNode = chooseNode(metadata, purpose === "publisher") if (!chosenNode) { throw new Error(`Stream was not found on any node`) } - const cachedConnection = ConnectionPool.getUsableCachedConnection(leader, streamName, chosenNode.host) + const cachedConnection = ConnectionPool.getUsableCachedConnection(purpose, streamName, chosenNode.host) if (cachedConnection) return cachedConnection const newConnection = await this.getConnectionOnChosenNode( - leader, + purpose, streamName, chosenNode, metadata, connectionClosedListener ) - ConnectionPool.cacheConnection(leader, streamName, newConnection.hostname, newConnection) + ConnectionPool.cacheConnection(purpose, streamName, newConnection.hostname, newConnection) return newConnection } @@ -593,25 +592,32 @@ export class Client { leader: boolean, streamName: string, connectionClosedListener?: ConnectionClosedListener - ) { + ): ConnectionParams { + const connectionId = randomUUID() const connectionListeners = { ...this.params.listeners, connection_closed: connectionClosedListener, - deliverV1: this.getDeliverV1Callback(), - deliverV2: this.getDeliverV2Callback(), - consumer_update_query: this.getConsumerUpdateCallback(), + deliverV1: this.getDeliverV1Callback(connectionId), + deliverV2: this.getDeliverV2Callback(connectionId), + consumer_update_query: this.getConsumerUpdateCallback(connectionId), + } + return { + ...this.params, + listeners: connectionListeners, + leader: leader, + streamName: streamName, + connectionId, } - return { ...this.params, listeners: connectionListeners, leader: leader, streamName: streamName } } private async getConnectionOnChosenNode( - leader: boolean, + purpose: ConnectionPurpose, streamName: string, chosenNode: { host: string; port: number }, metadata: StreamMetadata, connectionClosedListener?: ConnectionClosedListener ): Promise { - const connectionParams = this.buildConnectionParams(leader, streamName, connectionClosedListener) + const connectionParams = this.buildConnectionParams(purpose === "publisher", streamName, connectionClosedListener) if (this.params.addressResolver && this.params.addressResolver.enabled) { const maxAttempts = computeMaxAttempts(metadata) const resolver = this.params.addressResolver @@ -756,3 +762,11 @@ const chooseNode = (metadata: { leader?: Broker; replicas?: Broker[] }, leader: const computeMaxAttempts = (metadata: { leader?: Broker; replicas?: Broker[] }): number => { return Math.pow(2 + (metadata.leader ? 1 : 0) + (metadata.replicas?.length ?? 0), 2) } + +const extractConsumerId = (extendedConsumerId: string) => { + return parseInt(extendedConsumerId.split("@").shift() ?? "0") +} + +const extractPublisherId = (extendedPublisherId: string) => { + return parseInt(extendedPublisherId.split("@").shift() ?? "0") +} diff --git a/src/connection.ts b/src/connection.ts index 70766982..22c0c178 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -51,6 +51,7 @@ export type ConnectionListenersParams = ClientListenersParams & { export type ConnectionParams = ClientParams & { listeners?: ConnectionListenersParams + connectionId: string } export type ConnectionInfo = { @@ -81,7 +82,7 @@ export class Connection { private peerProperties: Record = {} private readonly bufferSizeSettings: BufferSizeSettings private frameMax: number = DEFAULT_FRAME_MAX - private connectionId: string + public readonly connectionId: string private connectionClosedListener: ConnectionClosedListener | undefined private serverEndpoint: { host: string; port: number } = { host: "", port: 5552 } private readonly serverDeclaredVersions: Version[] = [] @@ -89,7 +90,8 @@ export class Connection { private filteringEnabled: boolean = false public userManuallyClose: boolean = false private setupCompleted: boolean = false - public readonly id = randomUUID() + publisherId = 0 + consumerId = 0 constructor(private readonly params: ConnectionParams, private readonly logger: Logger) { this.hostname = params.hostname @@ -102,7 +104,7 @@ export class Connection { this.compressions.set(CompressionType.Gzip, GzipCompression.create()) this.decoder = new ResponseDecoder((...args) => this.responseReceived(...args), this.logger) this.bufferSizeSettings = params.bufferSizeSettings || {} - this.connectionId = randomUUID() + this.connectionId = params.connectionId ?? randomUUID() this.connectionClosedListener = params.listeners?.connection_closed this.logSocket("new") } @@ -122,7 +124,7 @@ export class Connection { return new Promise((res, rej) => { this.socket.on("error", (err) => { this.logger.warn( - `Error on connection ${this.id} ${this.params.hostname}:${this.params.port} vhost:${this.params.vhost} err: ${err}` + `Error on connection ${this.connectionId} ${this.params.hostname}:${this.params.port} vhost:${this.params.vhost} err: ${err}` ) return rej(err) }) @@ -149,7 +151,9 @@ export class Connection { }) this.socket.on("close", (had_error) => { this.setupCompleted = false - this.logger.info(`Close event on socket for connection ${this.id}, close cloud had_error? ${had_error}`) + this.logger.info( + `Close event on socket for connection ${this.connectionId}, close cloud had_error? ${had_error}` + ) if (this.connectionClosedListener && !this.userManuallyClose) this.connectionClosedListener(had_error) }) }) @@ -232,7 +236,7 @@ export class Connection { private logSocket(prefix: string = "") { this.logger.info( - `${prefix} socket for connection ${this.id}: ${inspect([ + `${prefix} socket for connection ${this.connectionId}: ${inspect([ this.socket.readable, this.socket.writable, this.socket.localAddress, @@ -505,6 +509,18 @@ export class Connection { public get refCount() { return this.refs } + + public getNextPublisherId() { + const publisherId = this.publisherId + this.publisherId++ + return publisherId + } + + public getNextConsumerId() { + const consumerId = this.consumerId + this.consumerId++ + return consumerId + } } export function errorMessageOf(code: number): string { diff --git a/src/connection_pool.ts b/src/connection_pool.ts index b61af26f..ff269362 100644 --- a/src/connection_pool.ts +++ b/src/connection_pool.ts @@ -2,26 +2,29 @@ import { Connection } from "./connection" import { getMaxSharedConnectionInstances } from "./util" type InstanceKey = string +export type ConnectionPurpose = "consumer" | "publisher" export class ConnectionPool { private static consumerConnectionProxies = new Map() private static publisherConnectionProxies = new Map() - public static getUsableCachedConnection(leader: boolean, streamName: string, host: string) { - const m = leader ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies - const k = ConnectionPool.getCacheKey(streamName, host) - const proxies = m.get(k) || [] + public static getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, host: string) { + const map = + purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies + const key = ConnectionPool.getCacheKey(streamName, host) + const proxies = map.get(key) || [] const connection = proxies.at(-1) const refCount = connection?.refCount return refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined } - public static cacheConnection(leader: boolean, streamName: string, host: string, client: Connection) { - const m = leader ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies - const k = ConnectionPool.getCacheKey(streamName, host) - const currentlyCached = m.get(k) || [] + public static cacheConnection(purpose: ConnectionPurpose, streamName: string, host: string, client: Connection) { + const map = + purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies + const key = ConnectionPool.getCacheKey(streamName, host) + const currentlyCached = map.get(key) || [] currentlyCached.push(client) - m.set(k, currentlyCached) + map.set(key, currentlyCached) } public static removeIfUnused(connection: Connection) { diff --git a/src/consumer.ts b/src/consumer.ts index e2bfc821..01d605dc 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -5,6 +5,9 @@ import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" export type ConsumerFunc = (message: Message) => void +export const computeExtendedConsumerId = (consumerId: number, connectionId: string) => { + return `${consumerId}@${connectionId}` +} export interface Consumer { close(manuallyClose: boolean): Promise @@ -13,6 +16,7 @@ export interface Consumer { getConnectionInfo(): ConnectionInfo consumerId: number consumerRef?: string + readonly extendedId: string } export class StreamConsumer implements Consumer { @@ -101,4 +105,8 @@ export class StreamConsumer implements Consumer { public get streamName(): string { return this.stream } + + public get extendedId(): string { + return computeExtendedConsumerId(this.consumerId, this.connection.connectionId) + } } diff --git a/src/publisher.ts b/src/publisher.ts index a5632466..3d9b3476 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -72,6 +72,10 @@ export interface MessageOptions { messageAnnotations?: Record } +export const computeExtendedPublisherId = (publisherId: number, connectionId: string) => { + return `${publisherId}@${connectionId}` +} + export interface Publisher { send(message: Buffer, opts?: MessageOptions): Promise basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise @@ -85,6 +89,7 @@ export interface Publisher { closed: boolean ref: string readonly publisherId: number + readonly extendedId: string } export type FilterFunc = (msg: Message) => string | undefined @@ -289,4 +294,8 @@ export class StreamPublisher implements Publisher { private popChunk() { return this.queue.splice(0, this.maxChunkLength) } + + public get extendedId(): string { + return computeExtendedPublisherId(this.publisherId, this.connection.connectionId) + } } diff --git a/src/util.ts b/src/util.ts index 9c9667a9..f5e98d76 100644 --- a/src/util.ts +++ b/src/util.ts @@ -17,7 +17,7 @@ export const DEFAULT_FRAME_MAX = 1048576 export const DEFAULT_UNLIMITED_FRAME_MAX = 0 export const REQUIRED_MANAGEMENT_VERSION = "3.13.0" export const getMaxSharedConnectionInstances = () => { - return +(process.env.MAX_SHARED_CLIENT_INSTANCES ?? 100) + return Math.max(+(process.env.MAX_SHARED_CLIENT_INSTANCES ?? 100), 256) } export const getAddressResolverFromEnv = (): { host: string; port: number } => { diff --git a/test/e2e/close_consumer.test.ts b/test/e2e/close_consumer.test.ts index 15a999f4..1544acfa 100644 --- a/test/e2e/close_consumer.test.ts +++ b/test/e2e/close_consumer.test.ts @@ -1,10 +1,12 @@ import { expect } from "chai" +import { randomUUID } from "node:crypto" import { Client, Consumer } from "../../src" +import { computeExtendedConsumerId } from "../../src/consumer" import { Offset } from "../../src/requests/subscribe_request" +import { getMaxSharedConnectionInstances } from "../../src/util" +import { createClient, createConsumer } from "../support/fake_data" import { Rabbit } from "../support/rabbit" import { eventually, expectToThrowAsync, getTestNodesFromEnv, password, username } from "../support/util" -import { createClient, createConsumer } from "../support/fake_data" -import { getMaxSharedConnectionInstances } from "../../src/util" describe("close consumer", () => { const rabbit = new Rabbit(username, password) @@ -38,7 +40,7 @@ describe("close consumer", () => { await client.declarePublisher({ stream: testStreamName }) const consumer = await client.declareConsumer({ stream: testStreamName, offset: Offset.first() }, () => null) - const response = await client.closeConsumer(consumer.consumerId) + const response = await client.closeConsumer(consumer.extendedId) expect(response).eql(true) expect(client.consumerCounts()).eql(0) @@ -49,7 +51,7 @@ describe("close consumer", () => { }).timeout(5000) it("closing a non-existing consumer should rise an error", async () => { - const nonExistingConsumerId = 123456 + const nonExistingConsumerId = computeExtendedConsumerId(123456, randomUUID()) await client.declarePublisher({ stream: testStreamName }) await expectToThrowAsync(() => client.closeConsumer(nonExistingConsumerId), Error) @@ -68,7 +70,7 @@ describe("close consumer", () => { } const sharingConsumers = Array.from(consumers.values()).find((consumerArrays) => consumerArrays.length >= 2) || [] - await client.closeConsumer(sharingConsumers[0].consumerId) + await client.closeConsumer(sharingConsumers[0].extendedId) const consumer2Info = sharingConsumers[1].getConnectionInfo() expect(sharingConsumers.length).gte(2) @@ -89,7 +91,7 @@ describe("close consumer", () => { const sharingConsumers = Array.from(consumers.values()).find((consumerArrays) => consumerArrays.length >= 2) || [] for (const c of sharingConsumers) { - await client.closeConsumer(c.consumerId) + await client.closeConsumer(c.extendedId) } await eventually(() => { @@ -123,7 +125,7 @@ describe("close consumer", () => { } for (const c of closingConsumersSubset) { - await client.closeConsumer(c.consumerId) + await client.closeConsumer(c.extendedId) } expect(localPort).not.undefined diff --git a/test/e2e/close_publisher.test.ts b/test/e2e/close_publisher.test.ts index 65ba90be..740bddd0 100644 --- a/test/e2e/close_publisher.test.ts +++ b/test/e2e/close_publisher.test.ts @@ -36,7 +36,7 @@ describe("close publisher", () => { it("closing a publisher", async () => { const publisher = await client.declarePublisher({ stream: testStreamName }) - const response = await client.deletePublisher(publisher.publisherId) + const response = await client.deletePublisher(publisher.extendedId) const publisherInfo = publisher.getConnectionInfo() expect(response).eql(true) @@ -48,7 +48,7 @@ describe("close publisher", () => { const publisher1 = await createPublisher(testStreamName, client) const publisher2 = await createPublisher(testStreamName, client) - await client.deletePublisher(publisher1.publisherId) + await client.deletePublisher(publisher1.extendedId) const publisher2Info = publisher2.getConnectionInfo() expect(publisher2Info.writable).eql(true) @@ -58,8 +58,8 @@ describe("close publisher", () => { const publisher1 = await createPublisher(testStreamName, client) const publisher2 = await createPublisher(testStreamName, client) - await client.deletePublisher(publisher1.publisherId) - await client.deletePublisher(publisher2.publisherId) + await client.deletePublisher(publisher1.extendedId) + await client.deletePublisher(publisher2.extendedId) const publisher1Info = publisher1.getConnectionInfo() const publisher2Info = publisher2.getConnectionInfo() @@ -88,7 +88,7 @@ describe("close publisher", () => { } for (const p of closingPublishersSubset) { - await client.deletePublisher(p.publisherId) + await client.deletePublisher(p.extendedId) } expect(localPort).not.undefined diff --git a/test/e2e/connection_closed_listener.test.ts b/test/e2e/connection_closed_listener.test.ts index 71baa5ec..167cd155 100644 --- a/test/e2e/connection_closed_listener.test.ts +++ b/test/e2e/connection_closed_listener.test.ts @@ -92,7 +92,7 @@ describe("connection closed callback", () => { connectionClosedListener: publisherListenerSpy, }) - await client.deletePublisher(publisher.publisherId) + await client.deletePublisher(publisher.extendedId) await always(() => { expect(publisherListenerSpy).to.have.been.called.exactly(0) @@ -112,7 +112,7 @@ describe("connection closed callback", () => { } ) - await client.closeConsumer(consumer.consumerId) + await client.closeConsumer(consumer.extendedId) await always(() => { expect(consumerListenerSpy).to.have.been.called.exactly(0) diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 2a200de9..fc4904ae 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -301,6 +301,29 @@ describe("declare consumer", () => { expect(Array.from(counts.keys()).length).gt(1) }).timeout(10000) + it("on a new connection, consumerId restarts from 0", async () => { + const consumersToCreate = (getMaxSharedConnectionInstances() + 1) * (getTestNodesFromEnv().length + 1) + const consumerIds: number[] = [] + for (let i = 0; i < consumersToCreate; i++) { + const consumer = await createConsumer(streamName, client) + consumerIds.push(consumer.consumerId) + } + + expect(consumerIds.filter((id) => id === 0).length).gt(1) + }).timeout(10000) + + it("declaring more than 256 consumers should not throw but rather open up multiple connections", async () => { + const publishersToCreate = 257 + const counts = new Map() + for (let i = 0; i < publishersToCreate; i++) { + const consumer = await createConsumer(streamName, client) + const { id } = consumer.getConnectionInfo() + counts.set(id, (counts.get(id) || 0) + 1) + } + + expect(Array.from(counts.keys()).length).gt(1) + }).timeout(10000) + describe("when the client declares a named connection", () => { let connectionName: string | undefined = undefined diff --git a/test/e2e/declare_publisher.test.ts b/test/e2e/declare_publisher.test.ts index d0d7075c..027bb10b 100644 --- a/test/e2e/declare_publisher.test.ts +++ b/test/e2e/declare_publisher.test.ts @@ -108,6 +108,29 @@ describe("declare publisher", () => { expect(Array.from(counts.keys()).length).gt(1) }).timeout(10000) + it("declaring more than 256 publishers should not throw but rather open up multiple connections", async () => { + const publishersToCreate = 257 + const counts = new Map() + for (let i = 0; i < publishersToCreate; i++) { + const publisher = await createPublisher(streamName, client) + const { id } = publisher.getConnectionInfo() + counts.set(id, (counts.get(id) || 0) + 1) + } + + expect(Array.from(counts.keys()).length).gt(1) + }).timeout(10000) + + it("on a new connection, publisherId restarts from 0", async () => { + const publishersToCreate = getMaxSharedConnectionInstances() + 1 + const publisherIds: number[] = [] + for (let i = 0; i < publishersToCreate; i++) { + const publisher = await createPublisher(streamName, client) + publisherIds.push(publisher.publisherId) + } + + expect(publisherIds.filter((id) => id === 0).length).gt(1) + }).timeout(10000) + describe("when the client declares a named connection", () => { let connectionName: string | undefined = undefined diff --git a/test/e2e/metadata_update.test.ts b/test/e2e/metadata_update.test.ts index d03042b2..e0166722 100644 --- a/test/e2e/metadata_update.test.ts +++ b/test/e2e/metadata_update.test.ts @@ -18,7 +18,7 @@ describe("react to a metadata update message from the server", () => { afterEach(async function () { try { // eslint-disable-next-line no-invalid-this - this.timeout(5000) + this.timeout(10000) await client.close() await rabbit.deleteStream(streamName) await rabbit.closeAllConnections() diff --git a/test/unit/delete_publisher.test.ts b/test/unit/delete_publisher.test.ts index 45569c87..f04d84ad 100644 --- a/test/unit/delete_publisher.test.ts +++ b/test/unit/delete_publisher.test.ts @@ -1,9 +1,10 @@ -import { Client } from "../../src" import { expect } from "chai" -import { Rabbit } from "../support/rabbit" import { randomUUID } from "crypto" -import { expectToThrowAsync, username, password } from "../support/util" +import { Client } from "../../src" +import { computeExtendedPublisherId } from "../../src/publisher" import { createClient } from "../support/fake_data" +import { Rabbit } from "../support/rabbit" +import { expectToThrowAsync, password, username } from "../support/util" describe("DeletePublisher command", () => { const rabbit = new Rabbit(username, password) @@ -26,17 +27,15 @@ describe("DeletePublisher command", () => { const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef }) await publisher.send(Buffer.from(`test${randomUUID()}`)) - const publisherId = publisher.publisherId - - const deletePublisher = await client.deletePublisher(Number(publisherId)) + const deletePublisher = await client.deletePublisher(publisher.extendedId) expect(deletePublisher).eql(true) }).timeout(10000) it("errors when deleting a publisher that does not exist", async () => { - const nonExistentPublisherId = 42 + const nonExistentPublisherId = computeExtendedPublisherId(42, randomUUID()) await expectToThrowAsync( - () => client.deletePublisher(Number(nonExistentPublisherId)), + () => client.deletePublisher(nonExistentPublisherId), Error, "Delete Publisher command returned error with code 18 - Publisher does not exist" )