@@ -126,6 +126,85 @@ final class KafkaClient {
126126 }
127127 }
128128
129+ /// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
130+ /// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
131+ final class CapturedCommitCallback {
132+ typealias Closure = ( Result < Void , KafkaError > ) -> Void
133+ let closure : Closure
134+
135+ init ( _ closure: @escaping Closure ) {
136+ self . closure = closure
137+ }
138+ }
139+
140+ /// Non-blocking commit of a the `message`'s offset to Kafka.
141+ ///
142+ /// - Parameter message: Last received message that shall be marked as read.
143+ func commitSync( _ message: KafkaConsumerMessage ) async throws {
144+ // Declare captured closure outside of withCheckedContinuation.
145+ // We do that because do an unretained pass of the captured closure to
146+ // librdkafka which means we have to keep a reference to the closure
147+ // ourselves to make sure it does not get deallocated before
148+ // commitSync returns.
149+ var capturedClosure : CapturedCommitCallback !
150+ try await withCheckedThrowingContinuation { continuation in
151+ capturedClosure = CapturedCommitCallback { result in
152+ continuation. resume ( with: result)
153+ }
154+
155+ // The offset committed is always the offset of the next requested message.
156+ // Thus, we increase the offset of the current message by one before committing it.
157+ // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
158+ let changesList = RDKafkaTopicPartitionList ( )
159+ changesList. setOffset (
160+ topic: message. topic,
161+ partition: message. partition,
162+ offset: Int64 ( message. offset + 1 )
163+ )
164+
165+ // Unretained pass because the reference that librdkafka holds to capturedClosure
166+ // should not be counted in ARC as this can lead to memory leaks.
167+ let opaquePointer : UnsafeMutableRawPointer ? = Unmanaged . passUnretained ( capturedClosure) . toOpaque ( )
168+
169+ let consumerQueue = rd_kafka_queue_get_consumer ( self . kafkaHandle)
170+
171+ // Create a C closure that calls the captured closure
172+ let callbackWrapper : (
173+ @convention ( c) (
174+ OpaquePointer ? ,
175+ rd_kafka_resp_err_t ,
176+ UnsafeMutablePointer < rd_kafka_topic_partition_list_t > ? ,
177+ UnsafeMutableRawPointer ?
178+ ) -> Void
179+ ) = { _, error, _, opaquePointer in
180+
181+ guard let opaquePointer = opaquePointer else {
182+ fatalError ( " Could not resolve reference to catpured Swift callback instance " )
183+ }
184+ let opaque = Unmanaged < CapturedCommitCallback > . fromOpaque ( opaquePointer) . takeUnretainedValue ( )
185+
186+ let actualCallback = opaque. closure
187+
188+ if error == RD_KAFKA_RESP_ERR_NO_ERROR {
189+ actualCallback ( . success( ( ) ) )
190+ } else {
191+ let kafkaError = KafkaError . rdKafkaError ( wrapping: error)
192+ actualCallback ( . failure( kafkaError) )
193+ }
194+ }
195+
196+ changesList. withListPointer { listPointer in
197+ rd_kafka_commit_queue (
198+ self . kafkaHandle,
199+ listPointer,
200+ consumerQueue,
201+ callbackWrapper,
202+ opaquePointer
203+ )
204+ }
205+ }
206+ }
207+
129208 /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
130209 /// - Warning: Do not escape the pointer from the closure for later use.
131210 /// - Parameter body: The closure will use the Kafka handle pointer.
0 commit comments