diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index 5dca295b..a945dfc3 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -126,6 +126,85 @@ final class KafkaClient { } } + /// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`. + /// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``. + final class CapturedCommitCallback { + typealias Closure = (Result) -> Void + let closure: Closure + + init(_ closure: @escaping Closure) { + self.closure = closure + } + } + + /// Non-blocking commit of a the `message`'s offset to Kafka. + /// + /// - Parameter message: Last received message that shall be marked as read. + func commitSync(_ message: KafkaConsumerMessage) async throws { + // Declare captured closure outside of withCheckedContinuation. + // We do that because do an unretained pass of the captured closure to + // librdkafka which means we have to keep a reference to the closure + // ourselves to make sure it does not get deallocated before + // commitSync returns. + var capturedClosure: CapturedCommitCallback! + try await withCheckedThrowingContinuation { continuation in + capturedClosure = CapturedCommitCallback { result in + continuation.resume(with: result) + } + + // The offset committed is always the offset of the next requested message. + // Thus, we increase the offset of the current message by one before committing it. + // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 + let changesList = RDKafkaTopicPartitionList() + changesList.setOffset( + topic: message.topic, + partition: message.partition, + offset: Int64(message.offset + 1) + ) + + // Unretained pass because the reference that librdkafka holds to capturedClosure + // should not be counted in ARC as this can lead to memory leaks. + let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque() + + let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle) + + // Create a C closure that calls the captured closure + let callbackWrapper: ( + @convention(c) ( + OpaquePointer?, + rd_kafka_resp_err_t, + UnsafeMutablePointer?, + UnsafeMutableRawPointer? + ) -> Void + ) = { _, error, _, opaquePointer in + + guard let opaquePointer = opaquePointer else { + fatalError("Could not resolve reference to catpured Swift callback instance") + } + let opaque = Unmanaged.fromOpaque(opaquePointer).takeUnretainedValue() + + let actualCallback = opaque.closure + + if error == RD_KAFKA_RESP_ERR_NO_ERROR { + actualCallback(.success(())) + } else { + let kafkaError = KafkaError.rdKafkaError(wrapping: error) + actualCallback(.failure(kafkaError)) + } + } + + changesList.withListPointer { listPointer in + rd_kafka_commit_queue( + self.kafkaHandle, + listPointer, + consumerQueue, + callbackWrapper, + opaquePointer + ) + } + } + } + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index ca908380..9843dccb 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -223,17 +223,6 @@ public final class KafkaConsumer { /// - Throws: A ``KafkaError`` if committing failed. /// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`. public func commitSync(_ message: KafkaConsumerMessage) async throws { - try await withCheckedThrowingContinuation { continuation in - do { - try self._commitSync(message) // Blocks until commiting the offset is done - continuation.resume() - } catch { - continuation.resume(throwing: error) - } - } - } - - private func _commitSync(_ message: KafkaConsumerMessage) throws { let action = self.stateMachine.withLockedValue { $0.commitSync() } switch action { case .throwClosedError: @@ -243,29 +232,7 @@ public final class KafkaConsumer { throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false") } - // The offset committed is always the offset of the next requested message. - // Thus, we increase the offset of the current message by one before committing it. - // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 - let changesList = RDKafkaTopicPartitionList() - changesList.setOffset( - topic: message.topic, - partition: message.partition, - offset: Int64(message.offset + 1) - ) - - let result = client.withKafkaHandlePointer { handle in - changesList.withListPointer { listPointer in - rd_kafka_commit( - handle, - listPointer, - 0 - ) // Blocks until commiting the offset is done - // -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68 - } - } - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: result) - } + try await client.commitSync(message) } } diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift b/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift index 398985ff..eee3ff7e 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift @@ -78,7 +78,9 @@ struct RDKafkaConfig { ) -> CapturedClosures { let closures = CapturedClosures() - // Pass the the reference to Opaque as an opaque object + // Pass the captured closure to the C closure as an opaque object. + // Unretained pass because the reference that librdkafka holds to the captured closures + // should not be counted in ARC as this can lead to memory leaks. let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque() rd_kafka_conf_set_opaque( configPointer,