Skip to content

Commit b7f8f16

Browse files
l4mbymagne
andauthored
186 - Connection closed listener no longer called by the client (#188)
* Connection closed listener no longer called by the client * Check fix on closing of super stream publisher and consumer * Check fix --------- Co-authored-by: magne <[email protected]>
1 parent 9525c38 commit b7f8f16

File tree

8 files changed

+136
-51
lines changed

8 files changed

+136
-51
lines changed

src/client.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import { DeliverResponseV2 } from "./responses/deliver_response_v2"
4545

4646
export type ConnectionClosedListener = (hadError: boolean) => void
4747

48-
export type ClosingParams = { closingCode: number; closingReason: string }
48+
export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean }
4949

5050
export class Client {
5151
private id: string = randomUUID()
@@ -87,11 +87,11 @@ export class Client {
8787
this.logger.info(`${this.id} Closing client...`)
8888
if (this.publisherCounts()) {
8989
this.logger.info(`Stopping all producers...`)
90-
await this.closeAllPublishers()
90+
await this.closeAllPublishers(true)
9191
}
9292
if (this.consumerCounts()) {
9393
this.logger.info(`Stopping all consumers...`)
94-
await this.closeAllConsumers()
94+
await this.closeAllConsumers(true)
9595
}
9696
this.connection.decrRefCount()
9797
await this.closeConnectionIfUnused(this.connection, params)
@@ -100,7 +100,7 @@ export class Client {
100100
private async closeConnectionIfUnused(connection: Connection, params: ClosingParams) {
101101
if (connection.refCount <= 0) {
102102
ConnectionPool.removeCachedConnection(this.connection)
103-
await this.connection.close(params)
103+
await this.connection.close({ ...params, manuallyClose: true })
104104
}
105105
}
106106

@@ -168,7 +168,7 @@ export class Client {
168168
if (!res.ok) {
169169
throw new Error(`Delete Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
170170
}
171-
await this.publishers.get(publisherId)?.publisher.close()
171+
await this.publishers.get(publisherId)?.publisher.close(true)
172172
this.publishers.delete(publisherId)
173173
this.logger.info(`deleted publisher with publishing id ${publisherId}`)
174174
return res.ok
@@ -235,7 +235,7 @@ export class Client {
235235
if (!res.ok) {
236236
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
237237
}
238-
await consumer.close()
238+
await consumer.close(true)
239239
this.consumers.delete(consumerId)
240240
this.logger.info(`Closed consumer with id: ${consumerId}`)
241241
return res.ok
@@ -263,13 +263,13 @@ export class Client {
263263
})
264264
}
265265

266-
private async closeAllConsumers() {
267-
await Promise.all([...this.consumers.values()].map((c) => c.close()))
266+
private async closeAllConsumers(manuallyClose: boolean) {
267+
await Promise.all([...this.consumers.values()].map((c) => c.close(manuallyClose)))
268268
this.consumers = new Map<number, StreamConsumer>()
269269
}
270270

271-
private async closeAllPublishers() {
272-
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close()))
271+
private async closeAllPublishers(manuallyClose: boolean) {
272+
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose)))
273273
this.publishers = new Map<number, { connection: Connection; publisher: StreamPublisher }>()
274274
}
275275

src/connection.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ export class Connection {
8686
private readonly serverDeclaredVersions: Version[] = []
8787
private refs: number = 0
8888
private filteringEnabled: boolean = false
89+
public userManuallyClose: boolean = false
8990

9091
constructor(private readonly params: ConnectionParams, private readonly logger: Logger) {
9192
this.hostname = params.hostname
@@ -148,7 +149,7 @@ export class Connection {
148149
})
149150
this.socket.on("close", (had_error) => {
150151
this.logger.info(`Close event on socket, close cloud had_error? ${had_error}`)
151-
if (this.connectionClosedListener) this.connectionClosedListener(had_error)
152+
if (this.connectionClosedListener && !this.userManuallyClose) this.connectionClosedListener(had_error)
152153
})
153154
})
154155
}
@@ -416,6 +417,7 @@ export class Connection {
416417
this.logger.debug(`Close...`)
417418
const closeResponse = await this.sendAndWait<CloseResponse>(new CloseRequest(params))
418419
this.logger.debug(`Close response: ${closeResponse.ok} - '${inspect(closeResponse)}'`)
420+
this.userManuallyClose = params.manuallyClose ?? false
419421
this.socket.end()
420422
}
421423

src/consumer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { Offset } from "./requests/subscribe_request"
77
export type ConsumerFunc = (message: Message) => void
88

99
export interface Consumer {
10-
close(): Promise<void>
10+
close(manuallyClose: boolean): Promise<void>
1111
storeOffset(offsetValue: bigint): Promise<void>
1212
queryOffset(): Promise<bigint>
1313
getConnectionInfo(): ConnectionInfo
@@ -41,10 +41,10 @@ export class StreamConsumer implements Consumer {
4141
this.connection.incrRefCount()
4242
}
4343

44-
async close(): Promise<void> {
44+
async close(manuallyClose: boolean): Promise<void> {
4545
this.connection.decrRefCount()
4646
if (ConnectionPool.removeIfUnused(this.connection)) {
47-
await this.connection.close()
47+
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
4848
}
4949
}
5050

src/publisher.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ export interface Publisher {
8383
on(event: "publish_confirm", listener: PublishConfirmCallback): void
8484
getLastPublishingId(): Promise<bigint>
8585
getConnectionInfo(): ConnectionInfo
86-
close(): Promise<void>
86+
close(manuallyClose: boolean): Promise<void>
8787
ref: string
8888
readonly publisherId: number
8989
}
@@ -197,12 +197,12 @@ export class StreamPublisher implements Publisher {
197197
return this.publisherRef
198198
}
199199

200-
public async close(): Promise<void> {
200+
public async close(manuallyClose: boolean): Promise<void> {
201201
if (!this.closed) {
202202
await this.flush()
203203
this.connection.decrRefCount()
204204
if (ConnectionPool.removeIfUnused(this.connection)) {
205-
await this.connection.close()
205+
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
206206
}
207207
}
208208
this.closed = true

src/super_stream_consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ export class SuperStreamConsumer {
4848
}
4949

5050
async close(): Promise<void> {
51-
await Promise.all([...this.consumers.values()].map((c) => c.close()))
51+
await Promise.all([...this.consumers.values()].map((c) => c.close(true)))
5252
}
5353
}

src/super_stream_publisher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class SuperStreamPublisher {
4343
}
4444

4545
public async close(): Promise<void> {
46-
await Promise.all([...this.publishers.values()].map((p) => p.close()))
46+
await Promise.all([...this.publishers.values()].map((p) => p.close(true)))
4747
this.publishers = new Map()
4848
}
4949

test/e2e/connection_closed_listener.test.ts

Lines changed: 114 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { Rabbit } from "../support/rabbit"
55
import { username, password, eventually, always } from "../support/util"
66
import { randomUUID } from "crypto"
77
import { Offset } from "../../src/requests/subscribe_request"
8+
89
describe("connection closed callback", () => {
910
let client: Client | undefined = undefined
1011
const rabbit = new Rabbit(username, password)
@@ -22,10 +23,10 @@ describe("connection closed callback", () => {
2223
afterEach(async () => {
2324
spySandbox?.restore()
2425
try {
25-
await client?.close()
2626
await rabbit.deleteStream(streamName)
27-
await rabbit.closeAllConnections()
2827
await rabbit.deleteAllQueues({ match: /my-stream-/ })
28+
await client?.close()
29+
await rabbit.closeAllConnections()
2930
} catch (e) {}
3031

3132
try {
@@ -65,55 +66,137 @@ describe("connection closed callback", () => {
6566
}, 1000)
6667
}).timeout(5000)
6768

68-
it("if specified, is called also on publisher and consumer socket events", async () => {
69+
it("closed_connection listener is not invoked by the client", async () => {
6970
const listener = (_hasError: boolean) => {
7071
return
7172
}
72-
const listenerSpy = spy(listener)
73-
client = await createClient(username, password, { connection_closed: listenerSpy })
74-
await client.declarePublisher({ stream: streamName, publisherRef, connectionClosedListener: listenerSpy })
75-
await client.declareConsumer(
76-
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: listenerSpy },
73+
const clientListenerSpy = spy(listener)
74+
client = await createClient(username, password, { connection_closed: clientListenerSpy })
75+
76+
await client.close()
77+
78+
await always(() => {
79+
expect(clientListenerSpy).to.have.been.called.exactly(0)
80+
})
81+
}).timeout(5000)
82+
83+
it("closed_connection listener is not invoked by the deletePublisher", async () => {
84+
const listener = (_hasError: boolean) => {
85+
return
86+
}
87+
const publisherListenerSpy = spy(listener)
88+
client = await createClient(username, password, { connection_closed: publisherListenerSpy })
89+
const publisher = await client.declarePublisher({
90+
stream: streamName,
91+
publisherRef,
92+
connectionClosedListener: publisherListenerSpy,
93+
})
94+
95+
await client.deletePublisher(publisher.publisherId)
96+
97+
await always(() => {
98+
expect(publisherListenerSpy).to.have.been.called.exactly(0)
99+
})
100+
}).timeout(5000)
101+
102+
it("closed_connection listener is not invoked by the deleteConsumer", async () => {
103+
const listener = (_hasError: boolean) => {
104+
return
105+
}
106+
const consumerListenerSpy = spy(listener)
107+
client = await createClient(username, password, { connection_closed: consumerListenerSpy })
108+
const consumer = await client.declareConsumer(
109+
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy },
77110
(_msg) => {
78111
return
79112
}
80113
)
81114

82-
await client.close()
115+
await client.closeConsumer(consumer.consumerId)
83116

84-
await eventually(() => {
85-
expect(listenerSpy).to.have.been.called.exactly(3)
86-
}, 1000)
117+
await always(() => {
118+
expect(consumerListenerSpy).to.have.been.called.exactly(0)
119+
})
87120
}).timeout(5000)
88121

89-
it("different callbacks for client, consumer and publisher are all called when connections close", async () => {
90-
const instListener = () => {
91-
return (_hasError: boolean) => {
122+
it("closed_connection listener is not invoked by the client even with multiple publishers and consumers", async () => {
123+
const listener = (_hasError: boolean) => {
124+
return
125+
}
126+
const clientListenerSpy = spy(listener)
127+
const publisherListenerSpy = spy(listener)
128+
const consumerListenerSpy = spy(listener)
129+
client = await createClient(username, password, { connection_closed: clientListenerSpy })
130+
await client.declarePublisher({
131+
stream: streamName,
132+
publisherRef,
133+
connectionClosedListener: publisherListenerSpy,
134+
})
135+
await client.declarePublisher({
136+
stream: streamName,
137+
publisherRef: `${publisherRef}-1`,
138+
connectionClosedListener: publisherListenerSpy,
139+
})
140+
await client.declareConsumer(
141+
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy },
142+
(_msg) => {
92143
return
93144
}
94-
}
95-
const listenerClientSpy = spy(instListener())
96-
const listenerConsumerSpy = spy(instListener())
97-
const listenerPublisherSpy = spy(instListener())
98-
client = await createClient(username, password, { connection_closed: listenerClientSpy })
99-
await client.declarePublisher({ stream: streamName, publisherRef, connectionClosedListener: listenerConsumerSpy })
145+
)
100146
await client.declareConsumer(
101-
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: listenerPublisherSpy },
147+
{
148+
stream: streamName,
149+
consumerRef: `${consumerRef}-1`,
150+
offset: Offset.first(),
151+
connectionClosedListener: consumerListenerSpy,
152+
},
102153
(_msg) => {
103154
return
104155
}
105156
)
106157

107158
await client.close()
108159

109-
await eventually(() => {
110-
expect(listenerClientSpy).to.have.been.called.exactly(1)
111-
}, 1000)
112-
await eventually(() => {
113-
expect(listenerConsumerSpy).to.have.been.called.exactly(1)
114-
}, 1000)
115-
await eventually(() => {
116-
expect(listenerPublisherSpy).to.have.been.called.exactly(1)
117-
}, 1000)
160+
await always(() => {
161+
expect(clientListenerSpy).to.have.been.called.exactly(0)
162+
})
163+
await always(() => {
164+
expect(publisherListenerSpy).to.have.been.called.exactly(0)
165+
})
166+
await always(() => {
167+
expect(consumerListenerSpy).to.have.been.called.exactly(0)
168+
})
118169
}).timeout(5000)
170+
171+
it("closed_connection listener is invoked by the server if it closes the connection", async () => {
172+
const listener = (_hasError: boolean) => {
173+
return
174+
}
175+
const clientListenerSpy = spy(listener)
176+
const publisherListenerSpy = spy(listener)
177+
const consumerListenerSpy = spy(listener)
178+
client = await createClient(username, password, { connection_closed: clientListenerSpy })
179+
await client.declarePublisher({
180+
stream: streamName,
181+
publisherRef,
182+
connectionClosedListener: publisherListenerSpy,
183+
})
184+
await client.declareConsumer(
185+
{ stream: streamName, consumerRef, offset: Offset.first(), connectionClosedListener: consumerListenerSpy },
186+
(_msg) => {
187+
return
188+
}
189+
)
190+
await sleep(5000)
191+
192+
await rabbit.closeAllConnections()
193+
194+
await eventually(() => {
195+
expect(clientListenerSpy).to.have.been.called.exactly(1)
196+
expect(publisherListenerSpy).to.have.been.called.exactly(1)
197+
expect(consumerListenerSpy).to.have.been.called.exactly(1)
198+
}, 5000)
199+
}).timeout(10000)
119200
})
201+
202+
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms))

test/e2e/sub_entry_consume.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ describe("consume a batch of messages", () => {
2929
await rabbit.deleteStream(streamName)
3030
await rabbit.closeAllConnections()
3131
await rabbit.deleteAllQueues({ match: /my-stream-/ })
32-
await consumer?.close()
32+
await consumer?.close(true)
3333
} catch (e) {}
3434
})
3535

0 commit comments

Comments
 (0)