@@ -31,30 +31,30 @@ public final class RDKafkaClient: Sendable {
3131 }
3232
3333 /// Handle for the C library's Kafka instance.
34- private let kafkaHandle : OpaquePointer
34+ private let kafkaHandle : SendableOpaquePointer
3535 /// A logger.
3636 private let logger : Logger
3737
3838 /// `librdkafka`'s `rd_kafka_queue_t` that events are received on.
39- private let queue : OpaquePointer
39+ private let queueHandle : SendableOpaquePointer
4040
4141 // Use factory method to initialize
4242 private init (
4343 type: ClientType ,
44- kafkaHandle: OpaquePointer ,
44+ kafkaHandle: SendableOpaquePointer ,
4545 logger: Logger
4646 ) {
4747 self . kafkaHandle = kafkaHandle
4848 self . logger = logger
49- self . queue = rd_kafka_queue_get_main ( self . kafkaHandle)
49+ self . queueHandle = . init ( rd_kafka_queue_get_main ( self . kafkaHandle. pointer ) )
5050
51- rd_kafka_set_log_queue ( self . kafkaHandle, self . queue )
51+ rd_kafka_set_log_queue ( self . kafkaHandle. pointer , self . queueHandle . pointer )
5252 }
5353
5454 deinit {
5555 // Loose reference to librdkafka's event queue
56- rd_kafka_queue_destroy ( self . queue )
57- rd_kafka_destroy ( kafkaHandle)
56+ rd_kafka_queue_destroy ( self . queueHandle . pointer )
57+ rd_kafka_destroy ( kafkaHandle. pointer )
5858 }
5959
6060 /// Factory method creating a new instance of a ``RDKafkaClient``.
@@ -87,7 +87,8 @@ public final class RDKafkaClient: Sendable {
8787 throw KafkaError . client ( reason: errorString)
8888 }
8989
90- return RDKafkaClient ( type: type, kafkaHandle: handle, logger: logger)
90+ let kafkaHandle = SendableOpaquePointer ( handle)
91+ return RDKafkaClient ( type: type, kafkaHandle: kafkaHandle, logger: logger)
9192 }
9293
9394 /// Produce a message to the Kafka cluster.
@@ -212,7 +213,7 @@ public final class RDKafkaClient: Sendable {
212213 assert ( arguments. count == size)
213214
214215 return rd_kafka_produceva (
215- self . kafkaHandle,
216+ self . kafkaHandle. pointer ,
216217 arguments,
217218 arguments. count
218219 )
@@ -304,7 +305,7 @@ public final class RDKafkaClient: Sendable {
304305 events. reserveCapacity ( maxEvents)
305306
306307 for _ in 0 ..< maxEvents {
307- let event = rd_kafka_queue_poll ( self . queue , 0 )
308+ let event = rd_kafka_queue_poll ( self . queueHandle . pointer , 0 )
308309 defer { rd_kafka_event_destroy ( event) }
309310
310311 let rdEventType = rd_kafka_event_type ( event)
@@ -437,7 +438,7 @@ public final class RDKafkaClient: Sendable {
437438 /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
438439 /// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
439440 func consumerPoll( ) throws -> KafkaConsumerMessage ? {
440- guard let messagePointer = rd_kafka_consumer_poll ( self . kafkaHandle, 0 ) else {
441+ guard let messagePointer = rd_kafka_consumer_poll ( self . kafkaHandle. pointer , 0 ) else {
441442 // No error, there might be no more messages
442443 return nil
443444 }
@@ -460,7 +461,7 @@ public final class RDKafkaClient: Sendable {
460461 /// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
461462 func subscribe( topicPartitionList: RDKafkaTopicPartitionList ) throws {
462463 try topicPartitionList. withListPointer { pointer in
463- let result = rd_kafka_subscribe ( self . kafkaHandle, pointer)
464+ let result = rd_kafka_subscribe ( self . kafkaHandle. pointer , pointer)
464465 if result != RD_KAFKA_RESP_ERR_NO_ERROR {
465466 throw KafkaError . rdKafkaError ( wrapping: result)
466467 }
@@ -471,7 +472,7 @@ public final class RDKafkaClient: Sendable {
471472 /// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
472473 func assign( topicPartitionList: RDKafkaTopicPartitionList ) throws {
473474 try topicPartitionList. withListPointer { pointer in
474- let result = rd_kafka_assign ( self . kafkaHandle, pointer)
475+ let result = rd_kafka_assign ( self . kafkaHandle. pointer , pointer)
475476 if result != RD_KAFKA_RESP_ERR_NO_ERROR {
476477 throw KafkaError . rdKafkaError ( wrapping: result)
477478 }
@@ -508,7 +509,7 @@ public final class RDKafkaClient: Sendable {
508509
509510 let error = changesList. withListPointer { listPointer in
510511 return rd_kafka_commit (
511- self . kafkaHandle,
512+ self . kafkaHandle. pointer ,
512513 listPointer,
513514 1 // async = true
514515 )
@@ -551,9 +552,9 @@ public final class RDKafkaClient: Sendable {
551552
552553 changesList. withListPointer { listPointer in
553554 rd_kafka_commit_queue (
554- self . kafkaHandle,
555+ self . kafkaHandle. pointer ,
555556 listPointer,
556- self . queue ,
557+ self . queueHandle . pointer ,
557558 nil ,
558559 opaquePointer
559560 )
@@ -572,7 +573,7 @@ public final class RDKafkaClient: Sendable {
572573 let queue = DispatchQueue ( label: " com.swift-server.swift-kafka.flush " )
573574 try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < Void , Error > ) in
574575 queue. async {
575- let error = rd_kafka_flush ( self . kafkaHandle, timeoutMilliseconds)
576+ let error = rd_kafka_flush ( self . kafkaHandle. pointer , timeoutMilliseconds)
576577 if error != RD_KAFKA_RESP_ERR_NO_ERROR {
577578 continuation. resume ( throwing: KafkaError . rdKafkaError ( wrapping: error) )
578579 } else {
@@ -587,7 +588,7 @@ public final class RDKafkaClient: Sendable {
587588 ///
588589 /// Make sure to run poll loop until ``RDKafkaClient/consumerIsClosed`` returns `true`.
589590 func consumerClose( ) throws {
590- let result = rd_kafka_consumer_close_queue ( self . kafkaHandle, self . queue )
591+ let result = rd_kafka_consumer_close_queue ( self . kafkaHandle. pointer , self . queueHandle . pointer )
591592 let kafkaError = rd_kafka_error_code ( result)
592593 if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
593594 throw KafkaError . rdKafkaError ( wrapping: kafkaError)
@@ -596,14 +597,14 @@ public final class RDKafkaClient: Sendable {
596597
597598 /// Returns `true` if the underlying `librdkafka` consumer is closed.
598599 var isConsumerClosed : Bool {
599- rd_kafka_consumer_closed ( self . kafkaHandle) == 1
600+ rd_kafka_consumer_closed ( self . kafkaHandle. pointer ) == 1
600601 }
601602
602603 /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
603604 /// - Warning: Do not escape the pointer from the closure for later use.
604605 /// - Parameter body: The closure will use the Kafka handle pointer.
605606 @discardableResult
606607 func withKafkaHandlePointer< T> ( _ body: ( OpaquePointer ) throws -> T ) rethrows -> T {
607- return try body ( self . kafkaHandle)
608+ return try body ( self . kafkaHandle. pointer )
608609 }
609610}
0 commit comments