diff --git a/src/client.ts b/src/client.ts index fe73b8a1..edf1354d 100644 --- a/src/client.ts +++ b/src/client.ts @@ -45,7 +45,7 @@ import { DeliverResponseV2 } from "./responses/deliver_response_v2" export type ConnectionClosedListener = (hadError: boolean) => void -export type ClosingParams = { closingCode: number; closingReason: string } +export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean } export class Client { private id: string = randomUUID() @@ -87,11 +87,11 @@ export class Client { this.logger.info(`${this.id} Closing client...`) if (this.publisherCounts()) { this.logger.info(`Stopping all producers...`) - await this.closeAllPublishers() + await this.closeAllPublishers(true) } if (this.consumerCounts()) { this.logger.info(`Stopping all consumers...`) - await this.closeAllConsumers() + await this.closeAllConsumers(true) } this.connection.decrRefCount() await this.closeConnectionIfUnused(this.connection, params) @@ -100,7 +100,7 @@ export class Client { private async closeConnectionIfUnused(connection: Connection, params: ClosingParams) { if (connection.refCount <= 0) { ConnectionPool.removeCachedConnection(this.connection) - await this.connection.close(params) + await this.connection.close({ ...params, manuallyClose: true }) } } @@ -168,7 +168,7 @@ export class Client { if (!res.ok) { throw new Error(`Delete Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } - await this.publishers.get(publisherId)?.publisher.close() + await this.publishers.get(publisherId)?.publisher.close(true) this.publishers.delete(publisherId) this.logger.info(`deleted publisher with publishing id ${publisherId}`) return res.ok @@ -235,7 +235,7 @@ export class Client { if (!res.ok) { throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } - await consumer.close() + await consumer.close(true) this.consumers.delete(consumerId) this.logger.info(`Closed consumer with id: ${consumerId}`) return res.ok @@ -263,13 +263,13 @@ export class Client { }) } - private async closeAllConsumers() { - await Promise.all([...this.consumers.values()].map((c) => c.close())) + private async closeAllConsumers(manuallyClose: boolean) { + await Promise.all([...this.consumers.values()].map((c) => c.close(manuallyClose))) this.consumers = new Map() } - private async closeAllPublishers() { - await Promise.all([...this.publishers.values()].map((c) => c.publisher.close())) + private async closeAllPublishers(manuallyClose: boolean) { + await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose))) this.publishers = new Map() } diff --git a/src/connection.ts b/src/connection.ts index bd276e14..ec9eb2f8 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -86,6 +86,7 @@ export class Connection { private readonly serverDeclaredVersions: Version[] = [] private refs: number = 0 private filteringEnabled: boolean = false + public userManuallyClose: boolean = false constructor(private readonly params: ConnectionParams, private readonly logger: Logger) { this.hostname = params.hostname @@ -148,7 +149,7 @@ export class Connection { }) this.socket.on("close", (had_error) => { this.logger.info(`Close event on socket, close cloud had_error? ${had_error}`) - if (this.connectionClosedListener) this.connectionClosedListener(had_error) + if (this.connectionClosedListener && !this.userManuallyClose) this.connectionClosedListener(had_error) }) }) } @@ -416,6 +417,7 @@ export class Connection { this.logger.debug(`Close...`) const closeResponse = await this.sendAndWait(new CloseRequest(params)) this.logger.debug(`Close response: ${closeResponse.ok} - '${inspect(closeResponse)}'`) + this.userManuallyClose = params.manuallyClose ?? false this.socket.end() } diff --git a/src/consumer.ts b/src/consumer.ts index 7a2824bd..d43a4bc1 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -7,7 +7,7 @@ import { Offset } from "./requests/subscribe_request" export type ConsumerFunc = (message: Message) => void export interface Consumer { - close(): Promise + close(manuallyClose: boolean): Promise storeOffset(offsetValue: bigint): Promise queryOffset(): Promise getConnectionInfo(): ConnectionInfo @@ -41,10 +41,10 @@ export class StreamConsumer implements Consumer { this.connection.incrRefCount() } - async close(): Promise { + async close(manuallyClose: boolean): Promise { this.connection.decrRefCount() if (ConnectionPool.removeIfUnused(this.connection)) { - await this.connection.close() + await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose }) } } diff --git a/src/publisher.ts b/src/publisher.ts index a06e21f8..587f3585 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -83,7 +83,7 @@ export interface Publisher { on(event: "publish_confirm", listener: PublishConfirmCallback): void getLastPublishingId(): Promise getConnectionInfo(): ConnectionInfo - close(): Promise + close(manuallyClose: boolean): Promise ref: string readonly publisherId: number } @@ -197,12 +197,12 @@ export class StreamPublisher implements Publisher { return this.publisherRef } - public async close(): Promise { + public async close(manuallyClose: boolean): Promise { if (!this.closed) { await this.flush() this.connection.decrRefCount() if (ConnectionPool.removeIfUnused(this.connection)) { - await this.connection.close() + await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose }) } } this.closed = true diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts index 7331871a..bd957b28 100644 --- a/src/super_stream_consumer.ts +++ b/src/super_stream_consumer.ts @@ -48,6 +48,6 @@ export class SuperStreamConsumer { } async close(): Promise { - await Promise.all([...this.consumers.values()].map((c) => c.close())) + await Promise.all([...this.consumers.values()].map((c) => c.close(true))) } } diff --git a/src/super_stream_publisher.ts b/src/super_stream_publisher.ts index 7b717e5f..7d83a279 100644 --- a/src/super_stream_publisher.ts +++ b/src/super_stream_publisher.ts @@ -43,7 +43,7 @@ export class SuperStreamPublisher { } public async close(): Promise { - await Promise.all([...this.publishers.values()].map((p) => p.close())) + await Promise.all([...this.publishers.values()].map((p) => p.close(true))) this.publishers = new Map() } diff --git a/test/e2e/connection_closed_listener.test.ts b/test/e2e/connection_closed_listener.test.ts index 1a58a72c..7769b289 100644 --- a/test/e2e/connection_closed_listener.test.ts +++ b/test/e2e/connection_closed_listener.test.ts @@ -5,6 +5,7 @@ import { Rabbit } from "../support/rabbit" import { username, password, eventually, always } from "../support/util" import { randomUUID } from "crypto" import { Offset } from "../../src/requests/subscribe_request" + describe("connection closed callback", () => { let client: Client | undefined = undefined const rabbit = new Rabbit(username, password) @@ -22,10 +23,10 @@ describe("connection closed callback", () => { afterEach(async () => { spySandbox?.restore() try { - await client?.close() await rabbit.deleteStream(streamName) - await rabbit.closeAllConnections() await rabbit.deleteAllQueues({ match: /my-stream-/ }) + await client?.close() + await rabbit.closeAllConnections() } catch (e) {} try { @@ -65,40 +66,90 @@ describe("connection closed callback", () => { }, 1000) }).timeout(5000) - it("if specified, is called also on publisher and consumer socket events", async () => { + it("closed_connection listener is not invoked by the client", async () => { const listener = (_hasError: boolean) => { return } - const listenerSpy = spy(listener) - client = await createClient(username, password, { connection_closed: listenerSpy }) - await client.declarePublisher({ stream: streamName, publisherRef, connectionClosedListener: listenerSpy }) - await client.declareConsumer( - { stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: listenerSpy }, + const clientListenerSpy = spy(listener) + client = await createClient(username, password, { connection_closed: clientListenerSpy }) + + await client.close() + + await always(() => { + expect(clientListenerSpy).to.have.been.called.exactly(0) + }) + }).timeout(5000) + + it("closed_connection listener is not invoked by the deletePublisher", async () => { + const listener = (_hasError: boolean) => { + return + } + const publisherListenerSpy = spy(listener) + client = await createClient(username, password, { connection_closed: publisherListenerSpy }) + const publisher = await client.declarePublisher({ + stream: streamName, + publisherRef, + connectionClosedListener: publisherListenerSpy, + }) + + await client.deletePublisher(publisher.publisherId) + + await always(() => { + expect(publisherListenerSpy).to.have.been.called.exactly(0) + }) + }).timeout(5000) + + it("closed_connection listener is not invoked by the deleteConsumer", async () => { + const listener = (_hasError: boolean) => { + return + } + const consumerListenerSpy = spy(listener) + client = await createClient(username, password, { connection_closed: consumerListenerSpy }) + const consumer = await client.declareConsumer( + { stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy }, (_msg) => { return } ) - await client.close() + await client.closeConsumer(consumer.consumerId) - await eventually(() => { - expect(listenerSpy).to.have.been.called.exactly(3) - }, 1000) + await always(() => { + expect(consumerListenerSpy).to.have.been.called.exactly(0) + }) }).timeout(5000) - it("different callbacks for client, consumer and publisher are all called when connections close", async () => { - const instListener = () => { - return (_hasError: boolean) => { + it("closed_connection listener is not invoked by the client even with multiple publishers and consumers", async () => { + const listener = (_hasError: boolean) => { + return + } + const clientListenerSpy = spy(listener) + const publisherListenerSpy = spy(listener) + const consumerListenerSpy = spy(listener) + client = await createClient(username, password, { connection_closed: clientListenerSpy }) + await client.declarePublisher({ + stream: streamName, + publisherRef, + connectionClosedListener: publisherListenerSpy, + }) + await client.declarePublisher({ + stream: streamName, + publisherRef: `${publisherRef}-1`, + connectionClosedListener: publisherListenerSpy, + }) + await client.declareConsumer( + { stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy }, + (_msg) => { return } - } - const listenerClientSpy = spy(instListener()) - const listenerConsumerSpy = spy(instListener()) - const listenerPublisherSpy = spy(instListener()) - client = await createClient(username, password, { connection_closed: listenerClientSpy }) - await client.declarePublisher({ stream: streamName, publisherRef, connectionClosedListener: listenerConsumerSpy }) + ) await client.declareConsumer( - { stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: listenerPublisherSpy }, + { + stream: streamName, + consumerRef: `${consumerRef}-1`, + offset: Offset.first(), + connectionClosedListener: consumerListenerSpy, + }, (_msg) => { return } @@ -106,14 +157,46 @@ describe("connection closed callback", () => { await client.close() - await eventually(() => { - expect(listenerClientSpy).to.have.been.called.exactly(1) - }, 1000) - await eventually(() => { - expect(listenerConsumerSpy).to.have.been.called.exactly(1) - }, 1000) - await eventually(() => { - expect(listenerPublisherSpy).to.have.been.called.exactly(1) - }, 1000) + await always(() => { + expect(clientListenerSpy).to.have.been.called.exactly(0) + }) + await always(() => { + expect(publisherListenerSpy).to.have.been.called.exactly(0) + }) + await always(() => { + expect(consumerListenerSpy).to.have.been.called.exactly(0) + }) }).timeout(5000) + + it("closed_connection listener is invoked by the server if it closes the connection", async () => { + const listener = (_hasError: boolean) => { + return + } + const clientListenerSpy = spy(listener) + const publisherListenerSpy = spy(listener) + const consumerListenerSpy = spy(listener) + client = await createClient(username, password, { connection_closed: clientListenerSpy }) + await client.declarePublisher({ + stream: streamName, + publisherRef, + connectionClosedListener: publisherListenerSpy, + }) + await client.declareConsumer( + { stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy }, + (_msg) => { + return + } + ) + await sleep(5000) + + await rabbit.closeAllConnections() + + await eventually(() => { + expect(clientListenerSpy).to.have.been.called.exactly(1) + expect(publisherListenerSpy).to.have.been.called.exactly(1) + expect(consumerListenerSpy).to.have.been.called.exactly(1) + }, 5000) + }).timeout(10000) }) + +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)) diff --git a/test/e2e/sub_entry_consume.test.ts b/test/e2e/sub_entry_consume.test.ts index a6e9e278..a04d352f 100644 --- a/test/e2e/sub_entry_consume.test.ts +++ b/test/e2e/sub_entry_consume.test.ts @@ -29,7 +29,7 @@ describe("consume a batch of messages", () => { await rabbit.deleteStream(streamName) await rabbit.closeAllConnections() await rabbit.deleteAllQueues({ match: /my-stream-/ }) - await consumer?.close() + await consumer?.close(true) } catch (e) {} })