Skip to content

Commit 5b17640

Browse files
committed
KafkaClient.consumerClose: make non-blocking
Motivation: [rd_kakfa_consumer_close](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a37b54d329e12d745889defe96e7d043d) was blocking. This PR proposes using the [rd_kakfa_consumer_close_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a9dd5c18bdfed81c8847b259f0a8d498d) API which is non-blocking and served through the normal poll loop. We now Modifications: * `KafkaClient.consumerClose`: use `rd_kakfa_consumer_close_queue` in favour of `rd_kakfa_consumer_close` * create a new variable `KafkaClient.isConsumerClosed` that indicates if the poll loop needs to continue polling or if it can stop running * updated state management in `KafkaConsumer` to accomodate for polling when the `KafkaConsumer` is in the process of closing Result: Calling `KafkaClient.consumerClose` is not blocking anymore.
1 parent 2bd1639 commit 5b17640

File tree

2 files changed

+39
-12
lines changed

2 files changed

+39
-12
lines changed

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,6 @@ final class KafkaClient {
118118
}
119119
}
120120

121-
/// Close the consumer.
122-
func consumerClose() throws {
123-
let result = rd_kafka_consumer_close(self.kafkaHandle)
124-
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
125-
throw KafkaError.rdKafkaError(wrapping: result)
126-
}
127-
}
128-
129121
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
130122
/// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
131123
final class CapturedCommitCallback {
@@ -205,6 +197,23 @@ final class KafkaClient {
205197
}
206198
}
207199

200+
/// Close the consumer asynchronously. This means revoking its assignemnt, committing offsets to broker and
201+
/// leaving the consumer group (if applicable).
202+
///
203+
/// Make sure to run poll loop until ``KafkaClient/consumerIsClosed`` returns `true`.
204+
func consumerClose() throws {
205+
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
206+
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, consumerQueue)
207+
let kafkaError = rd_kafka_error_code(result)
208+
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
209+
throw KafkaError.rdKafkaError(wrapping: kafkaError)
210+
}
211+
}
212+
213+
var isConsumerClosed: Bool {
214+
rd_kafka_consumer_closed(self.kafkaHandle) == 1
215+
}
216+
208217
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
209218
/// - Warning: Do not escape the pointer from the closure for later use.
210219
/// - Parameter body: The closure will use the Kafka handle pointer.

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,14 @@ public final class KafkaConsumer {
210210
source.finish()
211211
throw error
212212
}
213-
try await Task.sleep(for: self.config.pollInterval)
213+
// We support no back pressure, we can ignore the yield result
214+
_ = source.yield(messageResult)
215+
try await Task.sleep(for: self.pollInterval)
216+
case .pollUntilClosed(let client):
217+
// Ignore poll result, we are closing down and just polling to commit
218+
// outstanding consumer state
219+
_ = try client.consumerPoll()
220+
try await Task.sleep(for: self.pollInterval)
214221
case .terminatePollLoop:
215222
return
216223
}
@@ -304,7 +311,9 @@ extension KafkaConsumer {
304311
source: Producer.Source
305312
)
306313
/// The ``KafkaConsumer`` has been closed.
307-
case finished
314+
///
315+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
316+
case finished(client: KafkaClient)
308317
}
309318

310319
/// The current state of the StateMachine.
@@ -335,6 +344,11 @@ extension KafkaConsumer {
335344
client: KafkaClient,
336345
source: Producer.Source
337346
)
347+
/// The ``KafkaConsumer`` is in the process of closing down, but still needs to poll
348+
/// to commit its state to the broker.
349+
///
350+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
351+
case pollUntilClosed(client: KafkaClient)
338352
/// Terminate the poll loop.
339353
case terminatePollLoop
340354
}
@@ -351,8 +365,12 @@ extension KafkaConsumer {
351365
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
352366
case .consuming(let client, let source):
353367
return .pollForAndYieldMessage(client: client, source: source)
354-
case .finished:
355-
return .terminatePollLoop
368+
case .finished(let client):
369+
if client.isConsumerClosed {
370+
return .terminatePollLoop
371+
} else {
372+
return .pollUntilClosed(client: client)
373+
}
356374
}
357375
}
358376

0 commit comments

Comments
 (0)