Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,85 @@ final class KafkaClient {
}
}

/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
/// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
final class CapturedCommitCallback {
typealias Closure = (Result<Void, KafkaError>) -> Void
let closure: Closure

init(_ closure: @escaping Closure) {
self.closure = closure
}
}

/// Non-blocking commit of a the `message`'s offset to Kafka.
///
/// - Parameter message: Last received message that shall be marked as read.
func commitSync(_ message: KafkaConsumerMessage) async throws {
// Declare captured closure outside of withCheckedContinuation.
// We do that because do an unretained pass of the captured closure to
// librdkafka which means we have to keep a reference to the closure
// ourselves to make sure it does not get deallocated before
// commitSync returns.
var capturedClosure: CapturedCommitCallback!
try await withCheckedThrowingContinuation { continuation in
capturedClosure = CapturedCommitCallback { result in
continuation.resume(with: result)
}

// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset + 1)
)

// Unretained pass because the reference that librdkafka holds to capturedClosure
// should not be counted in ARC as this can lead to memory leaks.
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()

let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)

// Create a C closure that calls the captured closure
let callbackWrapper: (
@convention(c) (
OpaquePointer?,
rd_kafka_resp_err_t,
UnsafeMutablePointer<rd_kafka_topic_partition_list_t>?,
UnsafeMutableRawPointer?
) -> Void
) = { _, error, _, opaquePointer in

guard let opaquePointer = opaquePointer else {
fatalError("Could not resolve reference to catpured Swift callback instance")
}
let opaque = Unmanaged<CapturedCommitCallback>.fromOpaque(opaquePointer).takeUnretainedValue()

let actualCallback = opaque.closure

if error == RD_KAFKA_RESP_ERR_NO_ERROR {
actualCallback(.success(()))
} else {
let kafkaError = KafkaError.rdKafkaError(wrapping: error)
actualCallback(.failure(kafkaError))
}
}

changesList.withListPointer { listPointer in
rd_kafka_commit_queue(
self.kafkaHandle,
listPointer,
consumerQueue,
callbackWrapper,
opaquePointer
)
}
}
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
Expand Down
35 changes: 1 addition & 34 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,6 @@ public final class KafkaConsumer {
/// - Throws: A ``KafkaError`` if committing failed.
/// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`.
public func commitSync(_ message: KafkaConsumerMessage) async throws {
try await withCheckedThrowingContinuation { continuation in
do {
try self._commitSync(message) // Blocks until commiting the offset is done
continuation.resume()
} catch {
continuation.resume(throwing: error)
}
}
}

private func _commitSync(_ message: KafkaConsumerMessage) throws {
let action = self.stateMachine.withLockedValue { $0.commitSync() }
switch action {
case .throwClosedError:
Expand All @@ -243,29 +232,7 @@ public final class KafkaConsumer {
throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false")
}

// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset + 1)
)

let result = client.withKafkaHandlePointer { handle in
changesList.withListPointer { listPointer in
rd_kafka_commit(
handle,
listPointer,
0
) // Blocks until commiting the offset is done
// -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68
}
}
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: result)
}
try await client.commitSync(message)
}
}

Expand Down
4 changes: 3 additions & 1 deletion Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ struct RDKafkaConfig {
) -> CapturedClosures {
let closures = CapturedClosures()

// Pass the the reference to Opaque as an opaque object
// Pass the captured closure to the C closure as an opaque object.
// Unretained pass because the reference that librdkafka holds to the captured closures
// should not be counted in ARC as this can lead to memory leaks.
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque()
rd_kafka_conf_set_opaque(
configPointer,
Expand Down