Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import { DeliverResponseV2 } from "./responses/deliver_response_v2"

export type ConnectionClosedListener = (hadError: boolean) => void

export type ClosingParams = { closingCode: number; closingReason: string }
export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean }

export class Client {
private id: string = randomUUID()
Expand Down Expand Up @@ -87,11 +87,11 @@ export class Client {
this.logger.info(`${this.id} Closing client...`)
if (this.publisherCounts()) {
this.logger.info(`Stopping all producers...`)
await this.closeAllPublishers()
await this.closeAllPublishers(true)
}
if (this.consumerCounts()) {
this.logger.info(`Stopping all consumers...`)
await this.closeAllConsumers()
await this.closeAllConsumers(true)
}
this.connection.decrRefCount()
await this.closeConnectionIfUnused(this.connection, params)
Expand All @@ -100,7 +100,7 @@ export class Client {
private async closeConnectionIfUnused(connection: Connection, params: ClosingParams) {
if (connection.refCount <= 0) {
ConnectionPool.removeCachedConnection(this.connection)
await this.connection.close(params)
await this.connection.close({ ...params, manuallyClose: true })
}
}

Expand Down Expand Up @@ -168,7 +168,7 @@ export class Client {
if (!res.ok) {
throw new Error(`Delete Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
await this.publishers.get(publisherId)?.publisher.close()
await this.publishers.get(publisherId)?.publisher.close(true)
this.publishers.delete(publisherId)
this.logger.info(`deleted publisher with publishing id ${publisherId}`)
return res.ok
Expand Down Expand Up @@ -235,7 +235,7 @@ export class Client {
if (!res.ok) {
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
await consumer.close()
await consumer.close(true)
this.consumers.delete(consumerId)
this.logger.info(`Closed consumer with id: ${consumerId}`)
return res.ok
Expand Down Expand Up @@ -263,13 +263,13 @@ export class Client {
})
}

private async closeAllConsumers() {
await Promise.all([...this.consumers.values()].map((c) => c.close()))
private async closeAllConsumers(manuallyClose: boolean) {
await Promise.all([...this.consumers.values()].map((c) => c.close(manuallyClose)))
this.consumers = new Map<number, StreamConsumer>()
}

private async closeAllPublishers() {
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close()))
private async closeAllPublishers(manuallyClose: boolean) {
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose)))
this.publishers = new Map<number, { connection: Connection; publisher: StreamPublisher }>()
}

Expand Down
4 changes: 3 additions & 1 deletion src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export class Connection {
private readonly serverDeclaredVersions: Version[] = []
private refs: number = 0
private filteringEnabled: boolean = false
public userManuallyClose: boolean = false

constructor(private readonly params: ConnectionParams, private readonly logger: Logger) {
this.hostname = params.hostname
Expand Down Expand Up @@ -148,7 +149,7 @@ export class Connection {
})
this.socket.on("close", (had_error) => {
this.logger.info(`Close event on socket, close cloud had_error? ${had_error}`)
if (this.connectionClosedListener) this.connectionClosedListener(had_error)
if (this.connectionClosedListener && !this.userManuallyClose) this.connectionClosedListener(had_error)
})
})
}
Expand Down Expand Up @@ -416,6 +417,7 @@ export class Connection {
this.logger.debug(`Close...`)
const closeResponse = await this.sendAndWait<CloseResponse>(new CloseRequest(params))
this.logger.debug(`Close response: ${closeResponse.ok} - '${inspect(closeResponse)}'`)
this.userManuallyClose = params.manuallyClose ?? false
this.socket.end()
}

Expand Down
6 changes: 3 additions & 3 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Offset } from "./requests/subscribe_request"
export type ConsumerFunc = (message: Message) => void

export interface Consumer {
close(): Promise<void>
close(manuallyClose: boolean): Promise<void>
storeOffset(offsetValue: bigint): Promise<void>
queryOffset(): Promise<bigint>
getConnectionInfo(): ConnectionInfo
Expand Down Expand Up @@ -41,10 +41,10 @@ export class StreamConsumer implements Consumer {
this.connection.incrRefCount()
}

async close(): Promise<void> {
async close(manuallyClose: boolean): Promise<void> {
this.connection.decrRefCount()
if (ConnectionPool.removeIfUnused(this.connection)) {
await this.connection.close()
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export interface Publisher {
on(event: "publish_confirm", listener: PublishConfirmCallback): void
getLastPublishingId(): Promise<bigint>
getConnectionInfo(): ConnectionInfo
close(): Promise<void>
close(manuallyClose: boolean): Promise<void>
ref: string
readonly publisherId: number
}
Expand Down Expand Up @@ -197,12 +197,12 @@ export class StreamPublisher implements Publisher {
return this.publisherRef
}

public async close(): Promise<void> {
public async close(manuallyClose: boolean): Promise<void> {
if (!this.closed) {
await this.flush()
this.connection.decrRefCount()
if (ConnectionPool.removeIfUnused(this.connection)) {
await this.connection.close()
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
}
}
this.closed = true
Expand Down
2 changes: 1 addition & 1 deletion src/super_stream_consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ export class SuperStreamConsumer {
}

async close(): Promise<void> {
await Promise.all([...this.consumers.values()].map((c) => c.close()))
await Promise.all([...this.consumers.values()].map((c) => c.close(true)))
}
}
2 changes: 1 addition & 1 deletion src/super_stream_publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class SuperStreamPublisher {
}

public async close(): Promise<void> {
await Promise.all([...this.publishers.values()].map((p) => p.close()))
await Promise.all([...this.publishers.values()].map((p) => p.close(true)))
this.publishers = new Map()
}

Expand Down
145 changes: 114 additions & 31 deletions test/e2e/connection_closed_listener.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Rabbit } from "../support/rabbit"
import { username, password, eventually, always } from "../support/util"
import { randomUUID } from "crypto"
import { Offset } from "../../src/requests/subscribe_request"

describe("connection closed callback", () => {
let client: Client | undefined = undefined
const rabbit = new Rabbit(username, password)
Expand All @@ -22,10 +23,10 @@ describe("connection closed callback", () => {
afterEach(async () => {
spySandbox?.restore()
try {
await client?.close()
await rabbit.deleteStream(streamName)
await rabbit.closeAllConnections()
await rabbit.deleteAllQueues({ match: /my-stream-/ })
await client?.close()
await rabbit.closeAllConnections()
} catch (e) {}

try {
Expand Down Expand Up @@ -65,55 +66,137 @@ describe("connection closed callback", () => {
}, 1000)
}).timeout(5000)

it("if specified, is called also on publisher and consumer socket events", async () => {
it("closed_connection listener is not invoked by the client", async () => {
const listener = (_hasError: boolean) => {
return
}
const listenerSpy = spy(listener)
client = await createClient(username, password, { connection_closed: listenerSpy })
await client.declarePublisher({ stream: streamName, publisherRef, connectionClosedListener: listenerSpy })
await client.declareConsumer(
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: listenerSpy },
const clientListenerSpy = spy(listener)
client = await createClient(username, password, { connection_closed: clientListenerSpy })

await client.close()

await always(() => {
expect(clientListenerSpy).to.have.been.called.exactly(0)
})
}).timeout(5000)

it("closed_connection listener is not invoked by the deletePublisher", async () => {
const listener = (_hasError: boolean) => {
return
}
const publisherListenerSpy = spy(listener)
client = await createClient(username, password, { connection_closed: publisherListenerSpy })
const publisher = await client.declarePublisher({
stream: streamName,
publisherRef,
connectionClosedListener: publisherListenerSpy,
})

await client.deletePublisher(publisher.publisherId)

await always(() => {
expect(publisherListenerSpy).to.have.been.called.exactly(0)
})
}).timeout(5000)

it("closed_connection listener is not invoked by the deleteConsumer", async () => {
const listener = (_hasError: boolean) => {
return
}
const consumerListenerSpy = spy(listener)
client = await createClient(username, password, { connection_closed: consumerListenerSpy })
const consumer = await client.declareConsumer(
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy },
(_msg) => {
return
}
)

await client.close()
await client.closeConsumer(consumer.consumerId)

await eventually(() => {
expect(listenerSpy).to.have.been.called.exactly(3)
}, 1000)
await always(() => {
expect(consumerListenerSpy).to.have.been.called.exactly(0)
})
}).timeout(5000)

it("different callbacks for client, consumer and publisher are all called when connections close", async () => {
const instListener = () => {
return (_hasError: boolean) => {
it("closed_connection listener is not invoked by the client even with multiple publishers and consumers", async () => {
const listener = (_hasError: boolean) => {
return
}
const clientListenerSpy = spy(listener)
const publisherListenerSpy = spy(listener)
const consumerListenerSpy = spy(listener)
client = await createClient(username, password, { connection_closed: clientListenerSpy })
await client.declarePublisher({
stream: streamName,
publisherRef,
connectionClosedListener: publisherListenerSpy,
})
await client.declarePublisher({
stream: streamName,
publisherRef: `${publisherRef}-1`,
connectionClosedListener: publisherListenerSpy,
})
await client.declareConsumer(
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy },
(_msg) => {
return
}
}
const listenerClientSpy = spy(instListener())
const listenerConsumerSpy = spy(instListener())
const listenerPublisherSpy = spy(instListener())
client = await createClient(username, password, { connection_closed: listenerClientSpy })
await client.declarePublisher({ stream: streamName, publisherRef, connectionClosedListener: listenerConsumerSpy })
)
await client.declareConsumer(
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: listenerPublisherSpy },
{
stream: streamName,
consumerRef: `${consumerRef}-1`,
offset: Offset.first(),
connectionClosedListener: consumerListenerSpy,
},
(_msg) => {
return
}
)

await client.close()

await eventually(() => {
expect(listenerClientSpy).to.have.been.called.exactly(1)
}, 1000)
await eventually(() => {
expect(listenerConsumerSpy).to.have.been.called.exactly(1)
}, 1000)
await eventually(() => {
expect(listenerPublisherSpy).to.have.been.called.exactly(1)
}, 1000)
await always(() => {
expect(clientListenerSpy).to.have.been.called.exactly(0)
})
await always(() => {
expect(publisherListenerSpy).to.have.been.called.exactly(0)
})
await always(() => {
expect(consumerListenerSpy).to.have.been.called.exactly(0)
})
}).timeout(5000)

it("closed_connection listener is invoked by the server if it closes the connection", async () => {
const listener = (_hasError: boolean) => {
return
}
const clientListenerSpy = spy(listener)
const publisherListenerSpy = spy(listener)
const consumerListenerSpy = spy(listener)
client = await createClient(username, password, { connection_closed: clientListenerSpy })
await client.declarePublisher({
stream: streamName,
publisherRef,
connectionClosedListener: publisherListenerSpy,
})
await client.declareConsumer(
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy },
(_msg) => {
return
}
)
await sleep(5000)

await rabbit.closeAllConnections()

await eventually(() => {
expect(clientListenerSpy).to.have.been.called.exactly(1)
expect(publisherListenerSpy).to.have.been.called.exactly(1)
expect(consumerListenerSpy).to.have.been.called.exactly(1)
}, 5000)
}).timeout(10000)
})

const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms))
2 changes: 1 addition & 1 deletion test/e2e/sub_entry_consume.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe("consume a batch of messages", () => {
await rabbit.deleteStream(streamName)
await rabbit.closeAllConnections()
await rabbit.deleteAllQueues({ match: /my-stream-/ })
await consumer?.close()
await consumer?.close(true)
} catch (e) {}
})

Expand Down