Skip to content

Commit af22ded

Browse files
committed
Review Franz
Modifications: * introduce new `KafkaConsumer.StateMachine.State` `.finishing` to avoid retaining `client` in state `.finished` * rename `KafkaConsumer.shutdownGracefully` to `KafkaConsumer.triggerGracefulShutdown` * add note that `KafkaConsumer.commitSync` does not support `Task` cancellation
1 parent dd902ba commit af22ded

File tree

1 file changed

+24
-17
lines changed

1 file changed

+24
-17
lines changed

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
4040
}
4141

4242
func didTerminate() {
43-
// Duplicate of _shutdownGracefully
43+
// Duplicate of _triggerGracefulShutdown
4444
let action = self.stateMachine.withLockedValue { $0.finish() }
4545
switch action {
46-
case .shutdownGracefullyAndFinishSource(let client, let source):
46+
case .triggerGracefulShutdownAndFinishSource(let client, let source):
4747
source.finish()
4848

4949
do {
@@ -151,7 +151,7 @@ public final class KafkaConsumer {
151151
}
152152

153153
deinit {
154-
self.shutdownGracefully()
154+
self.triggerGracefulShutdown()
155155
}
156156

157157
/// Subscribe to the given list of `topics`.
@@ -226,6 +226,7 @@ public final class KafkaConsumer {
226226
/// - Parameter message: Last received message that shall be marked as read.
227227
/// - Throws: A ``KafkaError`` if committing failed.
228228
/// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`.
229+
/// - Important: This method does not support `Task` cancellation.
229230
public func commitSync(_ message: KafkaConsumerMessage) async throws {
230231
let action = self.stateMachine.withLockedValue { $0.commitSync() }
231232
switch action {
@@ -244,11 +245,11 @@ public final class KafkaConsumer {
244245
///
245246
/// - Note: Invoking this function is not always needed as the ``KafkaConsumer``
246247
/// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended.
247-
private func shutdownGracefully() {
248+
private func triggerGracefulShutdown() {
248249
let action = self.stateMachine.withLockedValue { $0.finish() }
249250
switch action {
250-
case .shutdownGracefullyAndFinishSource(let client, let source):
251-
self._shutdownGracefullyAndFinishSource(
251+
case .triggerGracefulShutdownAndFinishSource(let client, let source):
252+
self._triggerGracefulShutdownAndFinishSource(
252253
client: client,
253254
source: source,
254255
logger: self.logger
@@ -258,7 +259,7 @@ public final class KafkaConsumer {
258259
}
259260
}
260261

261-
private func _shutdownGracefullyAndFinishSource(
262+
private func _triggerGracefulShutdownAndFinishSource(
262263
client: KafkaClient,
263264
source: Producer.Source,
264265
logger: Logger
@@ -307,10 +308,13 @@ extension KafkaConsumer {
307308
client: KafkaClient,
308309
source: Producer.Source
309310
)
310-
/// The ``KafkaConsumer`` has been closed.
311+
/// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
312+
/// We are now in the process of commiting our last state to the broker.
311313
///
312314
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
313-
case finished(client: KafkaClient)
315+
case finishing(client: KafkaClient)
316+
/// The ``KafkaConsumer`` is closed.
317+
case finished
314318
}
315319

316320
/// The current state of the StateMachine.
@@ -354,20 +358,23 @@ extension KafkaConsumer {
354358
/// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken.
355359
///
356360
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
357-
func nextPollLoopAction() -> PollLoopAction {
361+
mutating func nextPollLoopAction() -> PollLoopAction {
358362
switch self.state {
359363
case .uninitialized:
360364
fatalError("\(#function) invoked while still in state \(self.state)")
361365
case .initializing:
362366
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
363367
case .consuming(let client, let source):
364368
return .pollForAndYieldMessage(client: client, source: source)
365-
case .finished(let client):
369+
case .finishing(let client):
366370
if client.isConsumerClosed {
371+
self.state = .finished
367372
return .terminatePollLoop
368373
} else {
369374
return .pollUntilClosed(client: client)
370375
}
376+
case .finished:
377+
return .terminatePollLoop
371378
}
372379
}
373380

@@ -391,7 +398,7 @@ extension KafkaConsumer {
391398
source: source
392399
)
393400
return .setUpConnection(client: client)
394-
case .consuming, .finished:
401+
case .consuming, .finishing, .finished:
395402
fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer")
396403
}
397404
}
@@ -420,7 +427,7 @@ extension KafkaConsumer {
420427
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
421428
case .consuming(let client, _):
422429
return .commitSync(client: client)
423-
case .finished:
430+
case .finishing, .finished:
424431
return .throwClosedError
425432
}
426433
}
@@ -431,7 +438,7 @@ extension KafkaConsumer {
431438
///
432439
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
433440
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
434-
case shutdownGracefullyAndFinishSource(
441+
case triggerGracefulShutdownAndFinishSource(
435442
client: KafkaClient,
436443
source: Producer.Source
437444
)
@@ -448,12 +455,12 @@ extension KafkaConsumer {
448455
case .initializing:
449456
fatalError("subscribe() / assign() should have been invoked before \(#function)")
450457
case .consuming(let client, let source):
451-
self.state = .finished(client: client)
452-
return .shutdownGracefullyAndFinishSource(
458+
self.state = .finishing(client: client)
459+
return .triggerGracefulShutdownAndFinishSource(
453460
client: client,
454461
source: source
455462
)
456-
case .finished:
463+
case .finishing, .finished:
457464
return nil
458465
}
459466
}

0 commit comments

Comments
 (0)