Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ let consumer = try KafkaConsumer(
logger: .kafkaTest // Your logger here
)

for await messageResult in consumer.messages {
switch messageResult {
case .success(let message):
// Do something with message
case .failure(let error):
// Handle error
await withThrowingTaskGroup(of: Void.self) { group in

// Run Task
group.addTask {
try await consumer.run()
}

// Task receiving messages
group.addTask {
for await message in consumer.messages {
// Do something with message
}
}
}
```
Expand All @@ -85,12 +91,18 @@ let consumer = try KafkaConsumer(
logger: .kafkaTest // Your logger here
)

for await messageResult in consumer.messages {
switch messageResult {
case .success(let message):
// Do something with message
case .failure(let error):
// Handle error
await withThrowingTaskGroup(of: Void.self) { group in

// Run Task
group.addTask {
try await consumer.run()
}

// Task receiving messages
group.addTask {
for await message in consumer.messages {
// Do something with message
}
}
}
```
Expand All @@ -111,13 +123,20 @@ let consumer = try KafkaConsumer(
logger: .kafkaTest // Your logger here
)

for await messageResult in consumer.messages {
switch messageResult {
case .success(let message):
// Do something with message
try await consumer.commitSync(message)
case .failure(let error):
// Handle error
await withThrowingTaskGroup(of: Void.self) { group in

// Run Task
group.addTask {
try await consumer.run()
}

// Task receiving messages
group.addTask {
for await message in consumer.messages {
// Do something with message
// ...
try await consumer.commitSync(message)
}
}
}
```
Expand Down
36 changes: 5 additions & 31 deletions Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ import struct Foundation.UUID
public struct KafkaConsumerConfiguration: Hashable {
// MARK: - SwiftKafka-specific Config properties

/// The backpressure strategy to be used for message consumption.
public var backPressureStrategy: KafkaSharedConfiguration.BackPressureStrategy = .watermark(
low: 10,
high: 50
)
/// The time between two consecutive polls.
/// Effectively controls the rate at which incoming events and messages are consumed.
public var pollInterval: Duration

// This backs the consumptionStrategy computed property.
private var _consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy
Expand Down Expand Up @@ -335,8 +333,8 @@ public struct KafkaConsumerConfiguration: Hashable {
}

public init(
pollInterval: Duration = .milliseconds(100),
consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy,
backPressureStrategy: KafkaSharedConfiguration.BackPressureStrategy = .watermark(low: 10, high: 50),
sessionTimeoutMs: UInt = 45000,
heartbeatIntervalMs: UInt = 3000,
maxPollInvervalMs: UInt = 300_000,
Expand Down Expand Up @@ -381,9 +379,9 @@ public struct KafkaConsumerConfiguration: Hashable {
saslUsername: String? = nil,
saslPassword: String? = nil
) {
self.pollInterval = pollInterval
self._consumptionStrategy = consumptionStrategy
self.consumptionStrategy = consumptionStrategy // used to invoke set { } method
self.backPressureStrategy = backPressureStrategy

self.sessionTimeoutMs = sessionTimeoutMs
self.heartbeatIntervalMs = heartbeatIntervalMs
Expand Down Expand Up @@ -473,30 +471,6 @@ public struct KafkaConsumerConfiguration: Hashable {
// MARK: - KafkaSharedConfiguration + Consumer Additions

extension KafkaSharedConfiguration {
/// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``.
public struct BackPressureStrategy: Hashable {
enum _BackPressureStrategy: Hashable {
case watermark(low: Int, high: Int)
}

let _internal: _BackPressureStrategy

private init(backPressureStrategy: _BackPressureStrategy) {
self._internal = backPressureStrategy
}

/// A back pressure strategy based on high and low watermarks.
///
/// The consumer maintains a buffer size between a low watermark and a high watermark
/// to control the flow of incoming messages.
///
/// - Parameter low: The lower threshold for the buffer size (low watermark).
/// - Parameter high: The upper threshold for the buffer size (high watermark).
public static func watermark(low: Int, high: Int) -> BackPressureStrategy {
return .init(backPressureStrategy: .watermark(low: low, high: high))
}
}

/// A struct representing the different Kafka message consumption strategies.
public struct ConsumptionStrategy: Hashable {
enum _ConsumptionStrategy: Hashable {
Expand Down
11 changes: 11 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
//===----------------------------------------------------------------------===//

public struct KafkaProducerConfiguration: Hashable {
// MARK: - SwiftKafka-specific Config properties

/// The time between two consecutive polls.
/// Effectively controls the rate at which incoming events and acknowledgements are consumed.
public var pollInterval: Duration

// MARK: - librdkafka Config properties

var dictionary: [String: String] = [:]

// MARK: - Producer-specific Config Properties
Expand Down Expand Up @@ -296,6 +304,7 @@ public struct KafkaProducerConfiguration: Hashable {
}

public init(
pollInterval: Duration = .milliseconds(100),
transactionalID: String = "",
transactionalTimeoutMs: UInt = 60000,
enableIdempotence: Bool = false,
Expand Down Expand Up @@ -340,6 +349,8 @@ public struct KafkaProducerConfiguration: Hashable {
saslUsername: String? = nil,
saslPassword: String? = nil
) {
self.pollInterval = pollInterval

self.transactionalID = transactionalID
self.transactionTimeoutMs = transactionalTimeoutMs
self.enableIdempotence = enableIdempotence
Expand Down
70 changes: 70 additions & 0 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,76 @@ final class KafkaClient {
return rd_kafka_poll(self.kafkaHandle, timeout)
}

/// Redirect the main ``KafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s
/// queue (``KafkaClient/consumerPoll``).
///
/// Events that would be triggered by ``KafkaClient/poll(timeout:)``
/// are now triggered by ``KafkaClient/consumerPoll``.
///
/// - Warning: It is not allowed to call ``KafkaClient/poll(timeout:)`` after ``KafkaClient/pollSetConsumer``.
func pollSetConsumer() throws {
let result = rd_kafka_poll_set_consumer(self.kafkaHandle)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
}
}

/// Request a new message from the Kafka cluster.
///
/// - Important: This method should only be invoked from ``KafkaConsumer``.
///
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
func consumerPoll() throws -> KafkaConsumerMessage? {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else {
// No error, there might be no more messages
return nil
}

defer {
// Destroy message otherwise poll() will block forever
rd_kafka_message_destroy(messagePointer)
}

// Reached the end of the topic+partition queue on the broker
if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
return nil
}

let message = try KafkaConsumerMessage(messagePointer: messagePointer)
return message
}

/// Subscribe to topic set using balanced consumer groups.
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
func subscribe(topicPartitionList: RDKafkaTopicPartitionList) throws {
try topicPartitionList.withListPointer { pointer in
let result = rd_kafka_subscribe(self.kafkaHandle, pointer)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
}
}
}

/// Atomic assignment of partitions to consume.
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
func assign(topicPartitionList: RDKafkaTopicPartitionList) throws {
try topicPartitionList.withListPointer { pointer in
let result = rd_kafka_assign(self.kafkaHandle, pointer)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
}
}
}

/// Close the consumer.
func consumerClose() throws {
let result = rd_kafka_consumer_close(self.kafkaHandle)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
}
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
Expand Down
Loading