Skip to content

Commit 8ded307

Browse files
committed
Fix: make KafkaConsumer.commitSync non-blocking
Motivation: Currently our invocation to `rd_kafka_commit` inside of `KafkaCosumer.commitSync` is blocking a cooperative thread. This PR aims to make `KafkaCosumer.commitSync` non-blocking by using the callback-based commit API. Modifications: * move `commitSync` logic to `KafkaClient` * replace the blocking invocation to [rd_kafka_commit](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#ab96539928328f14c3c9177ea0c896c87) with a callback-based invocation to [rd_kafka_commit_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#af76a6a73baa9c2621536e3f6882a3c1a) which is then wrapped inside a `withAsyncThrowingContinuation` statement
1 parent 0677dcf commit 8ded307

File tree

3 files changed

+86
-36
lines changed

3 files changed

+86
-36
lines changed

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,88 @@ final class KafkaClient {
9393
}
9494
}
9595

96+
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
97+
/// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
98+
final class CapturedCommitCallback {
99+
typealias Closure = (Result<Void, KafkaError>) -> Void
100+
let closure: Closure
101+
102+
init(_ closure: @escaping Closure) {
103+
self.closure = closure
104+
}
105+
}
106+
107+
/// Non-blocking commit of a the `message`'s offset to Kafka.
108+
///
109+
/// - Parameter message: Last received message that shall be marked as read.
110+
func commitSync(_ message: KafkaConsumerMessage) async throws {
111+
// Declare captured closure outside of withCheckedContinuation.
112+
// We do that because do an unretained pass of the captured closure to
113+
// librdkafka which means we have to keep a reference to the closure
114+
// ourselves to make sure it does not get deallocated before
115+
// commitSync returns.
116+
var capturedClosure: CapturedCommitCallback!
117+
try await withCheckedThrowingContinuation { continuation in
118+
capturedClosure = CapturedCommitCallback { result in
119+
continuation.resume(with: result)
120+
}
121+
122+
let changesList = rd_kafka_topic_partition_list_new(1)
123+
defer { rd_kafka_topic_partition_list_destroy(changesList) }
124+
guard let partitionPointer = rd_kafka_topic_partition_list_add(
125+
changesList,
126+
message.topic,
127+
message.partition.rawValue
128+
) else {
129+
fatalError("rd_kafka_topic_partition_list_add returned invalid pointer")
130+
}
131+
132+
// Unretained pass because the reference that librdkafka holds to capturedClosure
133+
// should not be counted in ARC as this can lead to memory leaks.
134+
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()
135+
136+
// The offset committed is always the offset of the next requested message.
137+
// Thus, we increase the offset of the current message by one before committing it.
138+
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
139+
partitionPointer.pointee.offset = Int64(message.offset + 1)
140+
141+
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
142+
143+
// Create a C closure that calls the captured closure
144+
let callbackWrapper: (
145+
@convention(c) (
146+
OpaquePointer?,
147+
rd_kafka_resp_err_t,
148+
UnsafeMutablePointer<rd_kafka_topic_partition_list_t>?,
149+
UnsafeMutableRawPointer?
150+
) -> Void
151+
) = { _, error, _, opaquePointer in
152+
153+
guard let opaquePointer = opaquePointer else {
154+
fatalError("Could not resolve reference to catpured Swift callback instance")
155+
}
156+
let opaque = Unmanaged<CapturedCommitCallback>.fromOpaque(opaquePointer).takeUnretainedValue()
157+
158+
let actualCallback = opaque.closure
159+
160+
if error == RD_KAFKA_RESP_ERR_NO_ERROR {
161+
actualCallback(.success(()))
162+
} else {
163+
let kafkaError = KafkaError.rdKafkaError(wrapping: error)
164+
actualCallback(.failure(kafkaError))
165+
}
166+
}
167+
168+
rd_kafka_commit_queue(
169+
self.kafkaHandle,
170+
changesList,
171+
consumerQueue,
172+
callbackWrapper,
173+
opaquePointer
174+
)
175+
}
176+
}
177+
96178
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
97179
/// - Warning: Do not escape the pointer from the closure for later use.
98180
/// - Parameter body: The closure will use the Kafka handle pointer.

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -260,17 +260,6 @@ public final class KafkaConsumer {
260260
/// - Throws: A ``KafkaError`` if committing failed.
261261
/// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`.
262262
public func commitSync(_ message: KafkaConsumerMessage) async throws {
263-
try await withCheckedThrowingContinuation { continuation in
264-
do {
265-
try self._commitSync(message) // Blocks until commiting the offset is done
266-
continuation.resume()
267-
} catch {
268-
continuation.resume(throwing: error)
269-
}
270-
}
271-
}
272-
273-
private func _commitSync(_ message: KafkaConsumerMessage) throws {
274263
let action = self.stateMachine.withLockedValue { $0.commitSync() }
275264
switch action {
276265
case .throwClosedError:
@@ -280,30 +269,7 @@ public final class KafkaConsumer {
280269
throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false")
281270
}
282271

283-
let changesList = rd_kafka_topic_partition_list_new(1)
284-
defer { rd_kafka_topic_partition_list_destroy(changesList) }
285-
guard let partitionPointer = rd_kafka_topic_partition_list_add(
286-
changesList,
287-
message.topic,
288-
message.partition.rawValue
289-
) else {
290-
fatalError("rd_kafka_topic_partition_list_add returned invalid pointer")
291-
}
292-
293-
// The offset committed is always the offset of the next requested message.
294-
// Thus, we increase the offset of the current message by one before committing it.
295-
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
296-
partitionPointer.pointee.offset = Int64(message.offset + 1)
297-
let result = client.withKafkaHandlePointer { handle in
298-
rd_kafka_commit(
299-
handle,
300-
changesList,
301-
0
302-
) // Blocks until commiting the offset is done
303-
}
304-
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
305-
throw KafkaError.rdKafkaError(wrapping: result)
306-
}
272+
try await client.commitSync(message)
307273
}
308274
}
309275

Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ struct RDKafkaConfig {
7373
_ callback: @escaping ((KafkaAcknowledgementResult?) -> Void)
7474
) -> CapturedClosure {
7575
let capturedClosure = CapturedClosure(callback)
76-
// Pass the captured closure to the C closure as an opaque object
76+
// Pass the captured closure to the C closure as an opaque object.
77+
// Unretained pass because the reference that librdkafka holds to capturedClosure
78+
// should not be counted in ARC as this can lead to memory leaks.
7779
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()
7880
rd_kafka_conf_set_opaque(
7981
configPointer,

0 commit comments

Comments
 (0)