From a6b72270a0098ffee8f5851ec26f11c7a7eac60b Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Fri, 16 Jun 2023 15:52:12 +0100 Subject: [PATCH 01/12] `KafkaConsumer` `run()` method Modifications: * `KafkaConsumer`: remove `NIOAsyncSequenceProducer` and use own implementation of `AsyncSequence` instead * write async wrapper method `KafkaClient.consumerPoll(timeout:)` * update memory leak tests for `KafkaConsumer` and `KafkaProducer` * update tests --- Package.swift | 2 +- Sources/SwiftKafka/KafkaClient.swift | 38 ++++ Sources/SwiftKafka/KafkaConsumer.swift | 172 +++++------------- Tests/IntegrationTests/SwiftKafkaTests.swift | 88 ++++----- .../SwiftKafkaTests/KafkaConsumerTests.swift | 54 ++++++ 5 files changed, 178 insertions(+), 176 deletions(-) create mode 100644 Tests/SwiftKafkaTests/KafkaConsumerTests.swift diff --git a/Package.swift b/Package.swift index c9154d82..c00aa4b5 100644 --- a/Package.swift +++ b/Package.swift @@ -32,7 +32,7 @@ let package = Package( .macOS(.v13), .iOS(.v16), .watchOS(.v9), - .tvOS(.v16), + .tvOS(.v16) ], products: [ .library( diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index bfca9e7b..2fdd95e0 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -56,6 +56,44 @@ final class KafkaClient { return rd_kafka_poll(self.kafkaHandle, timeout) } + /// Request a new message from the Kafka cluster. + /// + /// This method blocks for a maximum of `timeout` milliseconds. + /// + /// - Important: This method should only be invoked from ``KafkaConsumer``. + /// + /// - Parameter timeout: Maximum amount of milliseconds this method waits for a new message. + /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. + /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. + func consumerPoll(timeout: Int32 = 100) async throws -> KafkaConsumerMessage? { + try await withCheckedThrowingContinuation { continuation in + guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, timeout) else { + // No error, there might be no more messages + continuation.resume(returning: nil) + return + } + + defer { + // Destroy message otherwise poll() will block forever + rd_kafka_message_destroy(messagePointer) + } + + // Reached the end of the topic+partition queue on the broker + if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF { + continuation.resume(returning: nil) + return + } + + do { + let message = try KafkaConsumerMessage(messagePointer: messagePointer) + continuation.resume(returning: message) + } catch { + continuation.resume(throwing: error) + } + } + } + + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index 4566d08c..8f85ee50 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -15,43 +15,53 @@ import Crdkafka import Dispatch import Logging -import NIOCore +import NIOConcurrencyHelpers -/// `NIOAsyncSequenceProducerDelegate` implementation handling backpressure for ``KafkaConsumer``. -private struct ConsumerMessagesAsyncSequenceDelegate: NIOAsyncSequenceProducerDelegate { - let produceMoreClosure: @Sendable () -> Void - let didTerminateClosure: @Sendable () -> Void +// TODO: update readme +// TODO: move other stuff also to RDKafka +// TODO: remove NIOCore imports where possible +// TODO: remove backpressure option in config +// TODO: synchronization (remove dispatch, does this make sense?) - func produceMore() { - self.produceMoreClosure() - } - - func didTerminate() { - self.didTerminateClosure() - } -} +// TODO: state machine containig all the variables -> statemachine is passed to ConsumerMessagesAsyncSequence -> statemachine to different commit +// TODO: rename branch /// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct ConsumerMessagesAsyncSequence: AsyncSequence { public typealias Element = Result - typealias HighLowWatermark = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark - fileprivate let wrappedSequence: NIOAsyncSequenceProducer + internal let client: KafkaClient + internal let logger: Logger /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct ConsumerMessagesAsyncIterator: AsyncIteratorProtocol { - fileprivate let wrappedIterator: NIOAsyncSequenceProducer< - Element, - HighLowWatermark, - ConsumerMessagesAsyncSequenceDelegate - >.AsyncIterator + internal let client: KafkaClient + internal let logger: Logger public mutating func next() async -> Element? { - await self.wrappedIterator.next() + // TODO: refactor + let messageResult: Result + while !Task.isCancelled { + do { + // TODO: timeout + guard let message = try await client.consumerPoll() else { // TODO: pollInterval here + continue + } + messageResult = .success(message) + } catch let kafkaError as KafkaError { + messageResult = .failure(kafkaError) + } catch { + logger.error("KafkaConsumer caught error: \(error)") + continue + } + return messageResult + } + // TODO: close KafkaConsumer + return nil // Returning nil ends the sequence } } public func makeAsyncIterator() -> ConsumerMessagesAsyncIterator { - return ConsumerMessagesAsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator()) + return ConsumerMessagesAsyncIterator(client: self.client, logger: self.logger) } } @@ -68,19 +78,12 @@ public final class KafkaConsumer { /// Variable to ensure that no operations are invoked on closed consumer. private var closed = false + // TODO: remove /// Serial queue used to run all blocking operations. Additionally ensures that no data races occur. private let serialQueue: DispatchQueue - // We use implicitly unwrapped optionals here as these properties need to access self upon initialization - /// Type of the values returned by the ``messages`` sequence. - private typealias Element = Result - private var messagesSource: NIOAsyncSequenceProducer< - Element, - ConsumerMessagesAsyncSequence.HighLowWatermark, - ConsumerMessagesAsyncSequenceDelegate - >.Source! /// `AsyncSequence` that returns all ``KafkaConsumerMessage`` objects that the consumer receives. - public private(set) var messages: ConsumerMessagesAsyncSequence! + public let messages: ConsumerMessagesAsyncSequence /// Initialize a new ``KafkaConsumer``. /// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)`` @@ -95,6 +98,7 @@ public final class KafkaConsumer { self.config = config self.logger = logger self.client = try RDKafka.createClient(type: .consumer, configDictionary: config.dictionary, logger: self.logger) + self.messages = ConsumerMessagesAsyncSequence(client: self.client, logger: self.logger) self.subscribedTopicsPointer = rd_kafka_topic_partition_list_new(1) @@ -109,40 +113,6 @@ public final class KafkaConsumer { self.serialQueue = DispatchQueue(label: "swift-kafka-gsoc.consumer.serial") - var lowWatermark = 10 - var highWatermark = 50 - switch config.backPressureStrategy._internal { - case .watermark(let low, let high): - lowWatermark = low - highWatermark = high - } - let backpressureStrategy = ConsumerMessagesAsyncSequence.HighLowWatermark( - lowWatermark: lowWatermark, - highWatermark: highWatermark - ) - - // (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt) - // This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer. - // The source MUST be held by the caller and used to signal new elements or finish. - // The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller. - // This is due to the fact that deiniting the sequence is used as part of a trigger to - // terminate the underlying source. - // TODO: make self delegate to avoid weak reference here - let messagesSequenceDelegate = ConsumerMessagesAsyncSequenceDelegate { [weak self] in - self?.produceMore() - } didTerminateClosure: { [weak self] in - self?.close() - } - let messagesSourceAndSequence = NIOAsyncSequenceProducer.makeSequence( - elementType: Element.self, - backPressureStrategy: backpressureStrategy, - delegate: messagesSequenceDelegate - ) - self.messagesSource = messagesSourceAndSequence.source - self.messages = ConsumerMessagesAsyncSequence( - wrappedSequence: messagesSourceAndSequence.sequence - ) - switch config.consumptionStrategy._internal { case .partition(topic: let topic, partition: let partition, offset: let offset): try self.assign(topic: topic, partition: partition, offset: offset) @@ -151,6 +121,10 @@ public final class KafkaConsumer { } } + deinit { + // TODO: close + } + /// Subscribe to the given list of `topics`. /// The partition assignment happens automatically using `KafkaConsumer`'s consumer group. /// - Parameter topics: An array of topic names to subscribe to. @@ -206,71 +180,6 @@ public final class KafkaConsumer { } } - /// Receive new messages and forward the result to the ``messages`` `AsyncSequence`. - func produceMore() { - self.serialQueue.async { - guard !self.closed else { - return - } - - let messageResult: Element - do { - guard let message = try self.poll() else { - self.produceMore() - return - } - messageResult = .success(message) - } catch let kafkaError as KafkaError { - messageResult = .failure(kafkaError) - } catch { - self.logger.error("KafkaConsumer caught error: \(error)") - return - } - - let yieldresult = self.messagesSource.yield(messageResult) - switch yieldresult { - case .produceMore: - self.produceMore() - case .dropped, .stopProducing: - return - } - } - } - - /// Request a new message from the Kafka cluster. - /// This method blocks for a maximum of `timeout` milliseconds. - /// - Parameter timeout: Maximum amount of milliseconds this method waits for a new message. - /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. - /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. - private func poll(timeout: Int32 = 100) throws -> KafkaConsumerMessage? { - dispatchPrecondition(condition: .onQueue(self.serialQueue)) - assert(!self.closed) - - guard let messagePointer = self.client.withKafkaHandlePointer({ handle in - rd_kafka_consumer_poll(handle, timeout) - }) else { - // No error, there might be no more messages - return nil - } - - defer { - // Destroy message otherwise poll() will block forever - rd_kafka_message_destroy(messagePointer) - } - - // Reached the end of the topic+partition queue on the broker - if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF { - return nil - } - - do { - let message = try KafkaConsumerMessage(messagePointer: messagePointer) - return message - } catch { - throw error - } - } - /// Mark `message` in the topic as read and request the next message from the topic. /// This method is only used for manual offset management. /// - Parameter message: Last received message that shall be marked as read. @@ -325,6 +234,7 @@ public final class KafkaConsumer { return } + // TODO: make public shutdownGracefully /// Stop consuming messages. This step is irreversible. func close() { self.serialQueue.async { @@ -344,7 +254,7 @@ public final class KafkaConsumer { return } - self.closed = true + self.closed = true // TODO: hoist up } } diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index 23953c0c..f72ba92f 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -76,8 +76,20 @@ final class SwiftKafkaTests: XCTestCase { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(groupID: "subscription-test-group-id", topics: [self.uniqueTestTopic]), + autoOffsetReset: .beginning, // Always read topics from beginning + bootstrapServers: [self.bootstrapServer], + brokerAddressFamily: .v4 + ) + + let consumer = try KafkaConsumer( + config: consumerConfig, + logger: .kafkaTest + ) + await withThrowingTaskGroup(of: Void.self) { group in - // Run Task + // Producer Run Task group.addTask { try await producer.run() } @@ -94,18 +106,6 @@ final class SwiftKafkaTests: XCTestCase { // Consumer Task group.addTask { - let consumerConfig = KafkaConsumerConfiguration( - consumptionStrategy: .group(groupID: "subscription-test-group-id", topics: [self.uniqueTestTopic]), - autoOffsetReset: .beginning, // Always read topics from beginning - bootstrapServers: [self.bootstrapServer], - brokerAddressFamily: .v4 - ) - - let consumer = try KafkaConsumer( - config: consumerConfig, - logger: .kafkaTest - ) - var consumedMessages = [KafkaConsumerMessage]() for await messageResult in consumer.messages { guard case .success(let message) = messageResult else { @@ -133,8 +133,24 @@ final class SwiftKafkaTests: XCTestCase { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .partition( + topic: self.uniqueTestTopic, + partition: KafkaPartition(rawValue: 0), + offset: 0 + ), + autoOffsetReset: .beginning, // Always read topics from beginning + bootstrapServers: [self.bootstrapServer], + brokerAddressFamily: .v4 + ) + + let consumer = try KafkaConsumer( + config: consumerConfig, + logger: .kafkaTest + ) + await withThrowingTaskGroup(of: Void.self) { group in - // Run Task + // Producer Run Task group.addTask { try await producer.run() } @@ -151,22 +167,6 @@ final class SwiftKafkaTests: XCTestCase { // Consumer Task group.addTask { - let consumerConfiguration = KafkaConsumerConfiguration( - consumptionStrategy: .partition( - topic: self.uniqueTestTopic, - partition: KafkaPartition(rawValue: 0), - offset: 0 - ), - autoOffsetReset: .beginning, // Always read topics from beginning - bootstrapServers: [self.bootstrapServer], - brokerAddressFamily: .v4 - ) - - let consumer = try KafkaConsumer( - config: consumerConfiguration, - logger: .kafkaTest - ) - var consumedMessages = [KafkaConsumerMessage]() for await messageResult in consumer.messages { guard case .success(let message) = messageResult else { @@ -194,8 +194,21 @@ final class SwiftKafkaTests: XCTestCase { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest) + let consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(groupID: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), + enableAutoCommit: false, + autoOffsetReset: .beginning, // Always read topics from beginning + bootstrapServers: [self.bootstrapServer], + brokerAddressFamily: .v4 + ) + + let consumer = try KafkaConsumer( + config: consumerConfig, + logger: .kafkaTest + ) + await withThrowingTaskGroup(of: Void.self) { group in - // Run Task + // Producer Run Task group.addTask { try await producer.run() } @@ -212,19 +225,6 @@ final class SwiftKafkaTests: XCTestCase { // Consumer Task group.addTask { - let consumerConfig = KafkaConsumerConfiguration( - consumptionStrategy: .group(groupID: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), - enableAutoCommit: false, - autoOffsetReset: .beginning, // Always read topics from beginning - bootstrapServers: [self.bootstrapServer], - brokerAddressFamily: .v4 - ) - - let consumer = try KafkaConsumer( - config: consumerConfig, - logger: .kafkaTest - ) - var consumedMessages = [KafkaConsumerMessage]() for await messageResult in consumer.messages { guard case .success(let message) = messageResult else { diff --git a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift new file mode 100644 index 00000000..72cc224b --- /dev/null +++ b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift @@ -0,0 +1,54 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore +@testable import SwiftKafka +import XCTest + +// TODO: move to SwiftKafkaTests? +final class KafkaConsumerTests: XCTestCase { + // Read environment variables to get information about the test Kafka server + let kafkaHost = ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost" + let kafkaPort = ProcessInfo.processInfo.environment["KAFKA_PORT"] ?? "9092" + var bootstrapServer: String! + var config: KafkaConsumerConfig! + + override func setUpWithError() throws { + self.bootstrapServer = "\(self.kafkaHost):\(self.kafkaPort)" + + self.config = KafkaConsumerConfig( + consumptionStrategy: .group(groupID: "test-group", topics: ["test-topic"]), + bootstrapServers: [self.bootstrapServer], + brokerAddressFamily: .v4 + ) + } + + override func tearDownWithError() throws { + self.bootstrapServer = nil + self.config = nil + } + + + func testNoMemoryLeakAfterShutdown() async throws { + var consumer: KafkaConsumer? = try KafkaConsumer(config: config, logger: .kafkaTest) + weak var consumerCopy = consumer + + // TODO: one consuming version, one without consuming -> also non consuming test for producer + + consumer = nil + XCTAssertNil(consumerCopy) + } + + // MARK: See SwiftKafkaTests (Integration Tests) for actual message consumption tests +} From 840d6099982fa16cf122a670b30b080857f6c22f Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 19 Jun 2023 15:21:47 +0100 Subject: [PATCH 02/12] KafkaConsumer: Replace Serial Queue Modifications: * replace `serialQueue` in `KafkaConsumer` with `StateMachine` that encapsulates all variables and can also be accessed from the `ConsumerMessagesAsyncSequence` * close `KafkaConsumer` when `for await` loop of `AsyncSequence` is exited * make `ConsumerMessagesAsyncIterator` a class-backed `struct` * make `KafkaConsumer.shutdownGracefully` `public` --- Package.swift | 2 +- Sources/SwiftKafka/KafkaClient.swift | 1 - Sources/SwiftKafka/KafkaConsumer.swift | 432 +++++++++++++----- Sources/SwiftKafka/KafkaProducer.swift | 4 +- .../SwiftKafkaTests/KafkaConsumerTests.swift | 1 - 5 files changed, 308 insertions(+), 132 deletions(-) diff --git a/Package.swift b/Package.swift index c00aa4b5..c9154d82 100644 --- a/Package.swift +++ b/Package.swift @@ -32,7 +32,7 @@ let package = Package( .macOS(.v13), .iOS(.v16), .watchOS(.v9), - .tvOS(.v16) + .tvOS(.v16), ], products: [ .library( diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index 2fdd95e0..d0d0b405 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -93,7 +93,6 @@ final class KafkaClient { } } - /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index 8f85ee50..d2c33f66 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -13,55 +13,86 @@ //===----------------------------------------------------------------------===// import Crdkafka -import Dispatch import Logging import NIOConcurrencyHelpers // TODO: update readme +// TODO: remove KafkaConsumer tests // TODO: move other stuff also to RDKafka // TODO: remove NIOCore imports where possible // TODO: remove backpressure option in config -// TODO: synchronization (remove dispatch, does this make sense?) -// TODO: state machine containig all the variables -> statemachine is passed to ConsumerMessagesAsyncSequence -> statemachine to different commit - -// TODO: rename branch /// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct ConsumerMessagesAsyncSequence: AsyncSequence { public typealias Element = Result - internal let client: KafkaClient - internal let logger: Logger + internal let stateMachine: NIOLockedValueBox /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct ConsumerMessagesAsyncIterator: AsyncIteratorProtocol { - internal let client: KafkaClient - internal let logger: Logger - - public mutating func next() async -> Element? { - // TODO: refactor - let messageResult: Result - while !Task.isCancelled { - do { - // TODO: timeout - guard let message = try await client.consumerPoll() else { // TODO: pollInterval here - continue + class _Internal { + private let _stateMachine: NIOLockedValueBox + init(stateMachine: NIOLockedValueBox) { + self._stateMachine = stateMachine + } + + deinit { + self.shutdownGracefully() + } + + internal func next() async -> Element? { + let messageResult: Result + while !Task.isCancelled { + let nextAction = self._stateMachine.withLockedValue { $0.nextPollAction() } + switch nextAction { + case .pollForMessage(let client, let logger): + do { + // TODO: timeout + guard let message = try await client.consumerPoll() else { // TODO: pollInterval here + continue + } + messageResult = .success(message) + } catch let kafkaError as KafkaError { + messageResult = .failure(kafkaError) + } catch { + logger.error("KafkaConsumer caught error: \(error)") + continue + } + return messageResult + case .none: + return nil } - messageResult = .success(message) - } catch let kafkaError as KafkaError { - messageResult = .failure(kafkaError) - } catch { - logger.error("KafkaConsumer caught error: \(error)") - continue } - return messageResult + return nil // Returning nil ends the sequence } - // TODO: close KafkaConsumer - return nil // Returning nil ends the sequence + + private func shutdownGracefully() { + let action = self._stateMachine.withLockedValue { $0.finish() } + switch action { + case .shutdownGracefully(let client, let subscribedTopicsPointer, let logger): + KafkaConsumer.shutdownGracefully( + client: client, + subscribedTopicsPointer: subscribedTopicsPointer, + logger: logger + ) + case .none: + return + } + } + } + + private let _internal: _Internal + + init(stateMachine: NIOLockedValueBox) { + self._internal = .init(stateMachine: stateMachine) + } + + public func next() async -> Element? { + await self._internal.next() } } public func makeAsyncIterator() -> ConsumerMessagesAsyncIterator { - return ConsumerMessagesAsyncIterator(client: self.client, logger: self.logger) + return ConsumerMessagesAsyncIterator(stateMachine: self.stateMachine) } } @@ -69,18 +100,8 @@ public struct ConsumerMessagesAsyncSequence: AsyncSequence { public final class KafkaConsumer { /// The configuration object of the consumer client. private var config: KafkaConsumerConfiguration - /// A logger. - private let logger: Logger - /// Used for handling the connection to the Kafka cluster. - private let client: KafkaClient - /// Pointer to a list of topics + partition pairs. - private let subscribedTopicsPointer: UnsafeMutablePointer - /// Variable to ensure that no operations are invoked on closed consumer. - private var closed = false - - // TODO: remove - /// Serial queue used to run all blocking operations. Additionally ensures that no data races occur. - private let serialQueue: DispatchQueue + /// State of the `KafkaConsumer`. + private let stateMachine: NIOLockedValueBox /// `AsyncSequence` that returns all ``KafkaConsumerMessage`` objects that the consumer receives. public let messages: ConsumerMessagesAsyncSequence @@ -96,23 +117,25 @@ public final class KafkaConsumer { logger: Logger ) throws { self.config = config - self.logger = logger - self.client = try RDKafka.createClient(type: .consumer, configDictionary: config.dictionary, logger: self.logger) - self.messages = ConsumerMessagesAsyncSequence(client: self.client, logger: self.logger) - self.subscribedTopicsPointer = rd_kafka_topic_partition_list_new(1) + let client = try RDKafka.createClient(type: .consumer, configDictionary: config.dictionary, logger: logger) + + guard let subscribedTopicsPointer = rd_kafka_topic_partition_list_new(1) else { + throw KafkaError.client(reason: "Failed to allocate Topic+Partition list.") + } + + self.stateMachine = NIOLockedValueBox(StateMachine(state: .initializing(client: client, subscribedTopicsPointer: subscribedTopicsPointer, logger: logger))) + self.messages = ConsumerMessagesAsyncSequence(stateMachine: self.stateMachine) // Events that would be triggered by rd_kafka_poll // will now be also triggered by rd_kafka_consumer_poll - let result = self.client.withKafkaHandlePointer { handle in + let result = client.withKafkaHandlePointer { handle in rd_kafka_poll_set_consumer(handle) } guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { throw KafkaError.rdKafkaError(wrapping: result) } - self.serialQueue = DispatchQueue(label: "swift-kafka-gsoc.consumer.serial") - switch config.consumptionStrategy._internal { case .partition(topic: let topic, partition: let partition, offset: let offset): try self.assign(topic: topic, partition: partition, offset: offset) @@ -122,7 +145,8 @@ public final class KafkaConsumer { } deinit { - // TODO: close + // This occurs e.g. when for await loop is exited through break + self.shutdownGracefully() } /// Subscribe to the given list of `topics`. @@ -130,22 +154,24 @@ public final class KafkaConsumer { /// - Parameter topics: An array of topic names to subscribe to. /// - Throws: A ``KafkaError`` if subscribing to the topic list failed. private func subscribe(topics: [String]) throws { - assert(!self.closed) - - for topic in topics { - rd_kafka_topic_partition_list_add( - self.subscribedTopicsPointer, - topic, - KafkaPartition.unassigned.rawValue - ) - } + let action = self.stateMachine.withLockedValue { $0.setUpConnection() } + switch action { + case .setUpConnection(let client, let subscribedTopicsPointer): + for topic in topics { + rd_kafka_topic_partition_list_add( + subscribedTopicsPointer, + topic, + KafkaPartition.unassigned.rawValue + ) + } - let result = self.client.withKafkaHandlePointer { handle in - rd_kafka_subscribe(handle, self.subscribedTopicsPointer) - } + let result = client.withKafkaHandlePointer { handle in + rd_kafka_subscribe(handle, subscribedTopicsPointer) + } - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: result) + guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: result) + } } } @@ -159,24 +185,26 @@ public final class KafkaConsumer { partition: KafkaPartition, offset: Int ) throws { - assert(!self.closed) - - guard let partitionPointer = rd_kafka_topic_partition_list_add( - self.subscribedTopicsPointer, - topic, - partition.rawValue - ) else { - fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") - } + let action = self.stateMachine.withLockedValue { $0.setUpConnection() } + switch action { + case .setUpConnection(let client, let subscribedTopicsPointer): + guard let partitionPointer = rd_kafka_topic_partition_list_add( + subscribedTopicsPointer, + topic, + partition.rawValue + ) else { + fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") + } - partitionPointer.pointee.offset = Int64(offset) + partitionPointer.pointee.offset = Int64(offset) - let result = self.client.withKafkaHandlePointer { handle in - rd_kafka_assign(handle, self.subscribedTopicsPointer) - } + let result = client.withKafkaHandlePointer { handle in + rd_kafka_assign(handle, subscribedTopicsPointer) + } - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: result) + guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: result) + } } } @@ -186,10 +214,9 @@ public final class KafkaConsumer { /// - Throws: A ``KafkaError`` if committing failed. /// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`. public func commitSync(_ message: KafkaConsumerMessage) async throws { - try await self.serializeWithThrowingContinuation { (continuation: CheckedContinuation) in - + try await withCheckedThrowingContinuation { continuation in do { - try self._commitSync(message) + try self._commitSync(message) // Blocks until commiting the offset is done continuation.resume() } catch { continuation.resume(throwing: error) @@ -198,73 +225,224 @@ public final class KafkaConsumer { } private func _commitSync(_ message: KafkaConsumerMessage) throws { - dispatchPrecondition(condition: .onQueue(self.serialQueue)) - guard !self.closed else { + let action = self.stateMachine.withLockedValue { $0.commitSync() } + switch action { + case .throwClosedError: throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer") - } + case .commitSync(let client): + guard self.config.enableAutoCommit == false else { + throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false") + } - guard self.config.enableAutoCommit == false else { - throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false") - } + let changesList = rd_kafka_topic_partition_list_new(1) + defer { rd_kafka_topic_partition_list_destroy(changesList) } + guard let partitionPointer = rd_kafka_topic_partition_list_add( + changesList, + message.topic, + message.partition.rawValue + ) else { + fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") + } - let changesList = rd_kafka_topic_partition_list_new(1) - defer { rd_kafka_topic_partition_list_destroy(changesList) } - guard let partitionPointer = rd_kafka_topic_partition_list_add( - changesList, - message.topic, - message.partition.rawValue - ) else { - fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") + // The offset committed is always the offset of the next requested message. + // Thus, we increase the offset of the current message by one before committing it. + // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 + partitionPointer.pointee.offset = Int64(message.offset + 1) + let result = client.withKafkaHandlePointer { handle in + rd_kafka_commit( + handle, + changesList, + 0 + ) // Blocks until commiting the offset is done + } + guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: result) + } } + } - // The offset committed is always the offset of the next requested message. - // Thus, we increase the offset of the current message by one before committing it. - // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 - partitionPointer.pointee.offset = Int64(message.offset + 1) - let result = self.client.withKafkaHandlePointer { handle in - rd_kafka_commit( - handle, - changesList, - 0 + /// This function is used to gracefully shut down a Kafka consumer client. + /// + /// - Note: Invoking this function is not always needed as the ``KafkaConsumer`` + /// will already shut down when consumption of the ``ConsumerMessagesAsyncSequence`` has ended. + public func shutdownGracefully() { + let action = self.stateMachine.withLockedValue { $0.finish() } + switch action { + case .shutdownGracefully(let client, let subscribedTopicsPointer, let logger): + KafkaConsumer.shutdownGracefully( + client: client, + subscribedTopicsPointer: subscribedTopicsPointer, + logger: logger ) + case .none: + return } + } +} + +// MARK: - KafkaConsumer + static shutdownGracefully + +extension KafkaConsumer { + /// This function is used to gracefully shut down a Kafka consumer client. + fileprivate static func shutdownGracefully( + client: KafkaClient, + subscribedTopicsPointer: UnsafeMutablePointer, + logger: Logger + ) { + let result = client.withKafkaHandlePointer { handle in + rd_kafka_consumer_close(handle) + } + + rd_kafka_topic_partition_list_destroy(subscribedTopicsPointer) + guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: result) + let error = KafkaError.rdKafkaError(wrapping: result) + logger.error("Closing KafkaConsumer failed: \(error.description)") + return } - return } +} + +// MARK: - KafkaConsumer + StateMachine + +extension KafkaConsumer { + /// State machine representing the state of the ``KafkaConsumer``. + struct StateMachine { + /// The state of the ``StateMachine``. + enum State { + /// We are in the process of initializing the ``KafkaConsumer``, + /// though ``subscribe()`` / ``assign()`` have not been invoked. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. + /// - Parameter logger: A logger. + case initializing( + client: KafkaClient, + subscribedTopicsPointer: UnsafeMutablePointer, + logger: Logger + ) + /// The ``KafkaConsumer`` is consuming messages. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. + /// - Parameter logger: A logger. + case consuming( + client: KafkaClient, + subscribedTopicsPointer: UnsafeMutablePointer, + logger: Logger + ) + /// The ``KafkaConsumer`` has been closed. + case finished + } - // TODO: make public shutdownGracefully - /// Stop consuming messages. This step is irreversible. - func close() { - self.serialQueue.async { - guard !self.closed else { - return + /// The current state of the StateMachine. + var state: State + + /// Action to be taken when wanting to poll for a new message. + enum PollAction { + /// Poll for a new ``KafkaConsumerMessage``. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter logger: A logger. + case pollForMessage( + client: KafkaClient, + logger: Logger + ) + } + + /// Returns the next action to be taken when wanting to poll. + /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + func nextPollAction() -> PollAction? { + switch self.state { + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + case .consuming(let client, _, let logger): + return .pollForMessage(client: client, logger: logger) + case .finished: + return nil } + } - let result = self.client.withKafkaHandlePointer { handle in - rd_kafka_consumer_close(handle) + /// Action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. + enum SetUpConnectionAction { + /// Set up the connection through ``subscribe()`` or ``assign()``. + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. + case setUpConnection(client: KafkaClient, subscribedTopicsPointer: UnsafeMutablePointer) + } + + /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. + /// - Returns: The action to be taken. + mutating func setUpConnection() -> SetUpConnectionAction { + switch self.state { + case .initializing(let client, let subscribedTopicsPointer, let logger): + self.state = .consuming( + client: client, + subscribedTopicsPointer: subscribedTopicsPointer, + logger: logger + ) + return .setUpConnection(client: client, subscribedTopicsPointer: subscribedTopicsPointer) + case .consuming, .finished: + fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer") } + } - rd_kafka_topic_partition_list_destroy(self.subscribedTopicsPointer) + /// Action to be taken when wanting to do a synchronous commit. + enum CommitSyncAction { + /// Do a synchronous commit. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + case commitSync( + client: KafkaClient + ) + /// Throw an error. The ``KafkaConsumer`` is closed. + case throwClosedError + } - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - let error = KafkaError.rdKafkaError(wrapping: result) - self.logger.error("Closing KafkaConsumer failed: \(error.description)") - return + /// Get action to be taken when wanting to do a synchronous commit. + /// - Returns: The action to be taken. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + func commitSync() -> CommitSyncAction { + switch self.state { + case .initializing: + fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") + case .consuming(let client, _, _): + return .commitSync(client: client) + case .finished: + return .throwClosedError } + } - self.closed = true // TODO: hoist up + /// Action to be taken when wanting to do close the consumer. + enum FinishAction { + /// Shut down the ``KafkaConsumer``. + /// + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. + /// - Parameter logger: A logger. + case shutdownGracefully( + client: KafkaClient, + subscribedTopicsPointer: UnsafeMutablePointer, + logger: Logger + ) } - } - /// Helper function that enqueues a task with a checked throwing continuation into the ``KafkaConsumer``'s serial queue. - private func serializeWithThrowingContinuation(_ body: @escaping (CheckedContinuation) -> Void) async throws -> T { - try await withCheckedThrowingContinuation { continuation in - self.serialQueue.async { - body(continuation) - // Note: we do not support cancellation yet - // https://github.com/swift-server/swift-kafka-gsoc/issues/33 + /// Get action to be taken when wanting to do close the consumer. + /// - Returns: The action to be taken, or `nil` if there is no action to be taken. + /// + /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. + mutating func finish() -> FinishAction? { + switch self.state { + case .initializing: + fatalError("subscribe() / assign() should have been invoked before \(#function)") + case .consuming(let client, let subscribedTopicsPointer, let logger): + self.state = .finished + return .shutdownGracefully(client: client, subscribedTopicsPointer: subscribedTopicsPointer, logger: logger) + case .finished: + return nil } } } diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 45a0eb7f..6fc924da 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -180,13 +180,13 @@ public actor KafkaProducer { switch self.state { case .started: self.state = .shuttingDown - await self._shutDownGracefully(timeout: timeout) + await self._shutdownGracefully(timeout: timeout) case .shuttingDown, .shutDown: return } } - private func _shutDownGracefully(timeout: Int32) async { + private func _shutdownGracefully(timeout: Int32) async { await withCheckedContinuation { (continuation: CheckedContinuation) in // Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called self.client.withKafkaHandlePointer { handle in diff --git a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift index 72cc224b..b5eb0a01 100644 --- a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift @@ -39,7 +39,6 @@ final class KafkaConsumerTests: XCTestCase { self.config = nil } - func testNoMemoryLeakAfterShutdown() async throws { var consumer: KafkaConsumer? = try KafkaConsumer(config: config, logger: .kafkaTest) weak var consumerCopy = consumer From bc8a20311ee923e71cb406f292343d04ed043700 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 19 Jun 2023 17:23:06 +0100 Subject: [PATCH 03/12] * remove KafkaConsumerTests --- Sources/SwiftKafka/KafkaConsumer.swift | 2 - .../SwiftKafkaTests/KafkaConsumerTests.swift | 53 ------------------- 2 files changed, 55 deletions(-) delete mode 100644 Tests/SwiftKafkaTests/KafkaConsumerTests.swift diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index d2c33f66..156579c0 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -16,8 +16,6 @@ import Crdkafka import Logging import NIOConcurrencyHelpers -// TODO: update readme -// TODO: remove KafkaConsumer tests // TODO: move other stuff also to RDKafka // TODO: remove NIOCore imports where possible // TODO: remove backpressure option in config diff --git a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift deleted file mode 100644 index b5eb0a01..00000000 --- a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift +++ /dev/null @@ -1,53 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the swift-kafka-gsoc open source project -// -// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIOCore -@testable import SwiftKafka -import XCTest - -// TODO: move to SwiftKafkaTests? -final class KafkaConsumerTests: XCTestCase { - // Read environment variables to get information about the test Kafka server - let kafkaHost = ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost" - let kafkaPort = ProcessInfo.processInfo.environment["KAFKA_PORT"] ?? "9092" - var bootstrapServer: String! - var config: KafkaConsumerConfig! - - override func setUpWithError() throws { - self.bootstrapServer = "\(self.kafkaHost):\(self.kafkaPort)" - - self.config = KafkaConsumerConfig( - consumptionStrategy: .group(groupID: "test-group", topics: ["test-topic"]), - bootstrapServers: [self.bootstrapServer], - brokerAddressFamily: .v4 - ) - } - - override func tearDownWithError() throws { - self.bootstrapServer = nil - self.config = nil - } - - func testNoMemoryLeakAfterShutdown() async throws { - var consumer: KafkaConsumer? = try KafkaConsumer(config: config, logger: .kafkaTest) - weak var consumerCopy = consumer - - // TODO: one consuming version, one without consuming -> also non consuming test for producer - - consumer = nil - XCTAssertNil(consumerCopy) - } - - // MARK: See SwiftKafkaTests (Integration Tests) for actual message consumption tests -} From a824f466f2e9fe16859751d6049cc1fdee08866c Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 19 Jun 2023 17:25:06 +0100 Subject: [PATCH 04/12] * remove unused NIOCore imports --- Sources/SwiftKafka/KafkaConsumer.swift | 1 - Sources/SwiftKafka/KafkaProducer.swift | 1 - 2 files changed, 2 deletions(-) diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index 156579c0..a6aff608 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -17,7 +17,6 @@ import Logging import NIOConcurrencyHelpers // TODO: move other stuff also to RDKafka -// TODO: remove NIOCore imports where possible // TODO: remove backpressure option in config /// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 6fc924da..7923fd32 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -14,7 +14,6 @@ import Crdkafka import Logging -import NIOCore /// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``). public struct KafkaMessageAcknowledgements: AsyncSequence { From 3abb4d09432e950972269d08298c0157f21fea9c Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 19 Jun 2023 17:27:54 +0100 Subject: [PATCH 05/12] * remove backpressure strategy from KafkaConsumerConfig --- .../KafkaConsumerConfiguration.swift | 32 ------------------- Sources/SwiftKafka/KafkaConsumer.swift | 3 -- 2 files changed, 35 deletions(-) diff --git a/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift index 22bb713c..1ca33e64 100644 --- a/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift @@ -18,12 +18,6 @@ import struct Foundation.UUID public struct KafkaConsumerConfiguration: Hashable { // MARK: - SwiftKafka-specific Config properties - /// The backpressure strategy to be used for message consumption. - public var backPressureStrategy: KafkaSharedConfiguration.BackPressureStrategy = .watermark( - low: 10, - high: 50 - ) - // This backs the consumptionStrategy computed property. private var _consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy @@ -336,7 +330,6 @@ public struct KafkaConsumerConfiguration: Hashable { public init( consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy, - backPressureStrategy: KafkaSharedConfiguration.BackPressureStrategy = .watermark(low: 10, high: 50), sessionTimeoutMs: UInt = 45000, heartbeatIntervalMs: UInt = 3000, maxPollInvervalMs: UInt = 300_000, @@ -383,7 +376,6 @@ public struct KafkaConsumerConfiguration: Hashable { ) { self._consumptionStrategy = consumptionStrategy self.consumptionStrategy = consumptionStrategy // used to invoke set { } method - self.backPressureStrategy = backPressureStrategy self.sessionTimeoutMs = sessionTimeoutMs self.heartbeatIntervalMs = heartbeatIntervalMs @@ -473,30 +465,6 @@ public struct KafkaConsumerConfiguration: Hashable { // MARK: - KafkaSharedConfiguration + Consumer Additions extension KafkaSharedConfiguration { - /// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``. - public struct BackPressureStrategy: Hashable { - enum _BackPressureStrategy: Hashable { - case watermark(low: Int, high: Int) - } - - let _internal: _BackPressureStrategy - - private init(backPressureStrategy: _BackPressureStrategy) { - self._internal = backPressureStrategy - } - - /// A back pressure strategy based on high and low watermarks. - /// - /// The consumer maintains a buffer size between a low watermark and a high watermark - /// to control the flow of incoming messages. - /// - /// - Parameter low: The lower threshold for the buffer size (low watermark). - /// - Parameter high: The upper threshold for the buffer size (high watermark). - public static func watermark(low: Int, high: Int) -> BackPressureStrategy { - return .init(backPressureStrategy: .watermark(low: low, high: high)) - } - } - /// A struct representing the different Kafka message consumption strategies. public struct ConsumptionStrategy: Hashable { enum _ConsumptionStrategy: Hashable { diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index a6aff608..fb6f4f5f 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -16,9 +16,6 @@ import Crdkafka import Logging import NIOConcurrencyHelpers -// TODO: move other stuff also to RDKafka -// TODO: remove backpressure option in config - /// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct ConsumerMessagesAsyncSequence: AsyncSequence { public typealias Element = Result From 47963079d26d1a209716d3fb7ff98e85bc508fff Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 19 Jun 2023 17:34:43 +0100 Subject: [PATCH 06/12] * KafkaConsumer: inject poll interval through initializer (temporary) --- Sources/SwiftKafka/KafkaConsumer.swift | 33 +++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index fb6f4f5f..fba1b971 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -38,10 +38,9 @@ public struct ConsumerMessagesAsyncSequence: AsyncSequence { while !Task.isCancelled { let nextAction = self._stateMachine.withLockedValue { $0.nextPollAction() } switch nextAction { - case .pollForMessage(let client, let logger): + case .pollForMessage(let pollTimeout, let client, let logger): do { - // TODO: timeout - guard let message = try await client.consumerPoll() else { // TODO: pollInterval here + guard let message = try await client.consumerPoll(timeout: pollTimeout) else { continue } messageResult = .success(message) @@ -107,6 +106,7 @@ public final class KafkaConsumer { /// - Parameter logger: A logger. /// - Throws: A ``KafkaError`` if the initialization failed. public init( + pollTimeout: Int32 = 100, // TODO(felix): poll intervals through config in separate PR config: KafkaConsumerConfiguration, logger: Logger ) throws { @@ -118,7 +118,16 @@ public final class KafkaConsumer { throw KafkaError.client(reason: "Failed to allocate Topic+Partition list.") } - self.stateMachine = NIOLockedValueBox(StateMachine(state: .initializing(client: client, subscribedTopicsPointer: subscribedTopicsPointer, logger: logger))) + self.stateMachine = NIOLockedValueBox( + StateMachine( + state: .initializing( + pollTimeout: pollTimeout, + client: client, + subscribedTopicsPointer: subscribedTopicsPointer, + logger: logger + ) + ) + ) self.messages = ConsumerMessagesAsyncSequence(stateMachine: self.stateMachine) // Events that would be triggered by rd_kafka_poll @@ -307,20 +316,24 @@ extension KafkaConsumer { /// We are in the process of initializing the ``KafkaConsumer``, /// though ``subscribe()`` / ``assign()`` have not been invoked. /// + /// - Parameter pollTimeout: Poll timeout in millieseconds. /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. /// - Parameter logger: A logger. case initializing( + pollTimeout: Int32, client: KafkaClient, subscribedTopicsPointer: UnsafeMutablePointer, logger: Logger ) /// The ``KafkaConsumer`` is consuming messages. /// + /// - Parameter pollTimeout: Poll timeout in millieseconds. /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. /// - Parameter logger: A logger. case consuming( + pollTimeout: Int32, client: KafkaClient, subscribedTopicsPointer: UnsafeMutablePointer, logger: Logger @@ -339,6 +352,7 @@ extension KafkaConsumer { /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter logger: A logger. case pollForMessage( + pollTimeout: Int32, client: KafkaClient, logger: Logger ) @@ -352,8 +366,8 @@ extension KafkaConsumer { switch self.state { case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .consuming(let client, _, let logger): - return .pollForMessage(client: client, logger: logger) + case .consuming(let pollTimeout, let client, _, let logger): + return .pollForMessage(pollTimeout: pollTimeout, client: client, logger: logger) case .finished: return nil } @@ -371,8 +385,9 @@ extension KafkaConsumer { /// - Returns: The action to be taken. mutating func setUpConnection() -> SetUpConnectionAction { switch self.state { - case .initializing(let client, let subscribedTopicsPointer, let logger): + case .initializing(let pollTimeout, let client, let subscribedTopicsPointer, let logger): self.state = .consuming( + pollTimeout: pollTimeout, client: client, subscribedTopicsPointer: subscribedTopicsPointer, logger: logger @@ -403,7 +418,7 @@ extension KafkaConsumer { switch self.state { case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consuming(let client, _, _): + case .consuming(_, let client, _, _): return .commitSync(client: client) case .finished: return .throwClosedError @@ -432,7 +447,7 @@ extension KafkaConsumer { switch self.state { case .initializing: fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(let client, let subscribedTopicsPointer, let logger): + case .consuming(_, let client, let subscribedTopicsPointer, let logger): self.state = .finished return .shutdownGracefully(client: client, subscribedTopicsPointer: subscribedTopicsPointer, logger: logger) case .finished: From 8030c68ad0220285b4f363491a1d06c8c88c0b47 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Tue, 20 Jun 2023 16:19:53 +0100 Subject: [PATCH 07/12] Move polling to dedicated run() method Modifications: * add `run()` method to `KafkaConsumer` * rename `ConsumerMessagesAsyncSequence` to `KafkaConsumerMessages` * `KafkaConsumer`: put back `NIOAsyncSequenceProducer` * update `README` * use `Duration` type for `pollInterval` --- README.md | 71 +++-- Sources/SwiftKafka/KafkaClient.swift | 2 +- Sources/SwiftKafka/KafkaConsumer.swift | 290 +++++++++++++------ Tests/IntegrationTests/SwiftKafkaTests.swift | 15 + 4 files changed, 262 insertions(+), 116 deletions(-) diff --git a/README.md b/README.md index 6fa11f5a..0a5dd953 100644 --- a/README.md +++ b/README.md @@ -60,12 +60,23 @@ let consumer = try KafkaConsumer( logger: .kafkaTest // Your logger here ) -for await messageResult in consumer.messages { - switch messageResult { - case .success(let message): - // Do something with message - case .failure(let error): - // Handle error +await withThrowingTaskGroup(of: Void.self) { group in + + // Run Task + group.addTask { + try await consumer.run() + } + + // Task receiving messages + group.addTask { + for await messageResult in consumer.messages { + switch messageResult { + case .success(let message): + // Do something with message + case .failure(let error): + // Handle error + } + } } } ``` @@ -85,12 +96,23 @@ let consumer = try KafkaConsumer( logger: .kafkaTest // Your logger here ) -for await messageResult in consumer.messages { - switch messageResult { - case .success(let message): - // Do something with message - case .failure(let error): - // Handle error +await withThrowingTaskGroup(of: Void.self) { group in + + // Run Task + group.addTask { + try await consumer.run() + } + + // Task receiving messages + group.addTask { + for await messageResult in consumer.messages { + switch messageResult { + case .success(let message): + // Do something with message + case .failure(let error): + // Handle error + } + } } } ``` @@ -111,13 +133,24 @@ let consumer = try KafkaConsumer( logger: .kafkaTest // Your logger here ) -for await messageResult in consumer.messages { - switch messageResult { - case .success(let message): - // Do something with message - try await consumer.commitSync(message) - case .failure(let error): - // Handle error +await withThrowingTaskGroup(of: Void.self) { group in + + // Run Task + group.addTask { + try await consumer.run() + } + + // Task receiving messages + group.addTask { + for await messageResult in consumer.messages { + switch messageResult { + case .success(let message): + // Do something with message + try await consumer.commitSync(message) + case .failure(let error): + // Handle error + } + } } } ``` diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index d0d0b405..8a2fed56 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -65,7 +65,7 @@ final class KafkaClient { /// - Parameter timeout: Maximum amount of milliseconds this method waits for a new message. /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. - func consumerPoll(timeout: Int32 = 100) async throws -> KafkaConsumerMessage? { + func consumerPoll(timeout: Int32) async throws -> KafkaConsumerMessage? { try await withCheckedThrowingContinuation { continuation in guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, timeout) else { // No error, there might be no more messages diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index fba1b971..ae1742c9 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -15,89 +15,92 @@ import Crdkafka import Logging import NIOConcurrencyHelpers +import NIOCore -/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). -public struct ConsumerMessagesAsyncSequence: AsyncSequence { - public typealias Element = Result - internal let stateMachine: NIOLockedValueBox +// MARK: - NoBackPressure - /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). - public struct ConsumerMessagesAsyncIterator: AsyncIteratorProtocol { - class _Internal { - private let _stateMachine: NIOLockedValueBox - init(stateMachine: NIOLockedValueBox) { - self._stateMachine = stateMachine - } +/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true. +struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy { + func didYield(bufferDepth: Int) -> Bool { true } + func didConsume(bufferDepth: Int) -> Bool { true } +} - deinit { - self.shutdownGracefully() - } +// MARK: - ShutDownOnTerminate - internal func next() async -> Element? { - let messageResult: Result - while !Task.isCancelled { - let nextAction = self._stateMachine.withLockedValue { $0.nextPollAction() } - switch nextAction { - case .pollForMessage(let pollTimeout, let client, let logger): - do { - guard let message = try await client.consumerPoll(timeout: pollTimeout) else { - continue - } - messageResult = .success(message) - } catch let kafkaError as KafkaError { - messageResult = .failure(kafkaError) - } catch { - logger.error("KafkaConsumer caught error: \(error)") - continue - } - return messageResult - case .none: - return nil - } - } - return nil // Returning nil ends the sequence +/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the consumer down when +/// `didTerminate()` is invoked. +struct ShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock + let stateMachine: NIOLockedValueBox +} + +extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { + func produceMore() { + // No back pressure + return + } + + func didTerminate() { + // Duplicate of _shutdownGracefully + let action = self.stateMachine.withLockedValue { $0.finish() } + switch action { + case .shutdownGracefullyAndFinishSource(let client, let source, let subscribedTopicsPointer, let logger): + source.finish() + + let result = client.withKafkaHandlePointer { handle in + rd_kafka_consumer_close(handle) } - private func shutdownGracefully() { - let action = self._stateMachine.withLockedValue { $0.finish() } - switch action { - case .shutdownGracefully(let client, let subscribedTopicsPointer, let logger): - KafkaConsumer.shutdownGracefully( - client: client, - subscribedTopicsPointer: subscribedTopicsPointer, - logger: logger - ) - case .none: - return - } + rd_kafka_topic_partition_list_destroy(subscribedTopicsPointer) + + guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { + let error = KafkaError.rdKafkaError(wrapping: result) + logger.error("Closing KafkaConsumer failed: \(error.description)") + return } + case .none: + return } + } +} - private let _internal: _Internal +// MARK: - KafkaConsumerMessages - init(stateMachine: NIOLockedValueBox) { - self._internal = .init(stateMachine: stateMachine) - } +/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). +public struct KafkaConsumerMessages: AsyncSequence { + public typealias Element = Result + typealias WrappedSequence = NIOAsyncSequenceProducer + let wrappedSequence: WrappedSequence - public func next() async -> Element? { - await self._internal.next() + /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). + public struct ConsumerMessagesAsyncIterator: AsyncIteratorProtocol { + var wrappedIterator: WrappedSequence.AsyncIterator + + public mutating func next() async -> Element? { + await self.wrappedIterator.next() } } public func makeAsyncIterator() -> ConsumerMessagesAsyncIterator { - return ConsumerMessagesAsyncIterator(stateMachine: self.stateMachine) + return ConsumerMessagesAsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator()) } } +// MARK: - KafkaConsumer + /// Receive messages from the Kafka cluster. public final class KafkaConsumer { + typealias Producer = NIOAsyncSequenceProducer< + Result, + NoBackPressure, + ShutdownOnTerminate + > /// The configuration object of the consumer client. private var config: KafkaConsumerConfiguration /// State of the `KafkaConsumer`. private let stateMachine: NIOLockedValueBox /// `AsyncSequence` that returns all ``KafkaConsumerMessage`` objects that the consumer receives. - public let messages: ConsumerMessagesAsyncSequence + public let messages: KafkaConsumerMessages /// Initialize a new ``KafkaConsumer``. /// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)`` @@ -106,7 +109,7 @@ public final class KafkaConsumer { /// - Parameter logger: A logger. /// - Throws: A ``KafkaError`` if the initialization failed. public init( - pollTimeout: Int32 = 100, // TODO(felix): poll intervals through config in separate PR + pollInterval: Duration = .milliseconds(100), // TODO(felix): poll intervals through config in separate PR config: KafkaConsumerConfiguration, logger: Logger ) throws { @@ -118,17 +121,27 @@ public final class KafkaConsumer { throw KafkaError.client(reason: "Failed to allocate Topic+Partition list.") } - self.stateMachine = NIOLockedValueBox( - StateMachine( - state: .initializing( - pollTimeout: pollTimeout, - client: client, - subscribedTopicsPointer: subscribedTopicsPointer, - logger: logger - ) - ) + self.stateMachine = NIOLockedValueBox(StateMachine()) + + let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( + elementType: Result.self, + backPressureStrategy: NoBackPressure(), + delegate: ShutdownOnTerminate(stateMachine: self.stateMachine) ) - self.messages = ConsumerMessagesAsyncSequence(stateMachine: self.stateMachine) + + self.messages = KafkaConsumerMessages( + wrappedSequence: sourceAndSequence.sequence + ) + + self.stateMachine.withLockedValue { + $0.initialize( + pollInterval: pollInterval, + client: client, + source: sourceAndSequence.source, + subscribedTopicsPointer: subscribedTopicsPointer, + logger: logger + ) + } // Events that would be triggered by rd_kafka_poll // will now be also triggered by rd_kafka_consumer_poll @@ -211,6 +224,36 @@ public final class KafkaConsumer { } } + /// Start polling Kafka for messages. + /// + /// - Returns: An awaitable task representing the execution of the poll loop. + public func run() async throws { + // TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle) + while !Task.isCancelled { + let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } + switch nextAction { + case .pollForAndYieldMessage(let pollInterval, let client, let source, let logger): + let messageResult: Result + do { + guard let message = try await client.consumerPoll(timeout: 0) else { + break + } + messageResult = .success(message) + } catch let kafkaError as KafkaError { + messageResult = .failure(kafkaError) + } catch { + logger.error("KafkaConsumer caught error: \(error)") + break + } + // We support no back pressure, we can ignore the yield result + _ = source.yield(messageResult) + try await Task.sleep(for: pollInterval) + case .killPollLoop: + return + } + } + } + /// Mark `message` in the topic as read and request the next message from the topic. /// This method is only used for manual offset management. /// - Parameter message: Last received message that shall be marked as read. @@ -267,13 +310,14 @@ public final class KafkaConsumer { /// This function is used to gracefully shut down a Kafka consumer client. /// /// - Note: Invoking this function is not always needed as the ``KafkaConsumer`` - /// will already shut down when consumption of the ``ConsumerMessagesAsyncSequence`` has ended. + /// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended. public func shutdownGracefully() { let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefully(let client, let subscribedTopicsPointer, let logger): - KafkaConsumer.shutdownGracefully( + case .shutdownGracefullyAndFinishSource(let client, let source, let subscribedTopicsPointer, let logger): + self._shutdownGracefullyAndFinishSource( client: client, + source: source, subscribedTopicsPointer: subscribedTopicsPointer, logger: logger ) @@ -281,17 +325,15 @@ public final class KafkaConsumer { return } } -} - -// MARK: - KafkaConsumer + static shutdownGracefully -extension KafkaConsumer { - /// This function is used to gracefully shut down a Kafka consumer client. - fileprivate static func shutdownGracefully( + private func _shutdownGracefullyAndFinishSource( client: KafkaClient, + source: Producer.Source, subscribedTopicsPointer: UnsafeMutablePointer, logger: Logger ) { + source.finish() + let result = client.withKafkaHandlePointer { handle in rd_kafka_consumer_close(handle) } @@ -313,28 +355,35 @@ extension KafkaConsumer { struct StateMachine { /// The state of the ``StateMachine``. enum State { + /// The state machine has been initialized with init() but is not yet Initialized + /// using `func initialize()` (required). + case uninitialized /// We are in the process of initializing the ``KafkaConsumer``, /// though ``subscribe()`` / ``assign()`` have not been invoked. /// - /// - Parameter pollTimeout: Poll timeout in millieseconds. + /// - Parameter pollInterval: Amount of time between two subsequent polls invocations. /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. /// - Parameter logger: A logger. case initializing( - pollTimeout: Int32, + pollInterval: Duration, client: KafkaClient, + source: Producer.Source, subscribedTopicsPointer: UnsafeMutablePointer, logger: Logger ) /// The ``KafkaConsumer`` is consuming messages. /// - /// - Parameter pollTimeout: Poll timeout in millieseconds. + /// - Parameter pollInterval: Amount of time between two subsequent polls invocations. /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. /// - Parameter logger: A logger. case consuming( - pollTimeout: Int32, + pollInterval: Duration, client: KafkaClient, + source: Producer.Source, subscribedTopicsPointer: UnsafeMutablePointer, logger: Logger ) @@ -343,33 +392,61 @@ extension KafkaConsumer { } /// The current state of the StateMachine. - var state: State + var state: State = .uninitialized + + /// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are + /// not yet available when the normal initialization occurs. + mutating func initialize( + pollInterval: Duration, + client: KafkaClient, + source: Producer.Source, + subscribedTopicsPointer: UnsafeMutablePointer, + logger: Logger + ) { + guard case .uninitialized = self.state else { + fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") + } + self.state = .initializing( + pollInterval: pollInterval, + client: client, + source: source, + subscribedTopicsPointer: subscribedTopicsPointer, + logger: logger + ) + } /// Action to be taken when wanting to poll for a new message. - enum PollAction { + enum PollLoopAction { /// Poll for a new ``KafkaConsumerMessage``. /// + /// - Parameter pollInterval: Amount of time between two subsequent polls invocations. /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. /// - Parameter logger: A logger. - case pollForMessage( - pollTimeout: Int32, + case pollForAndYieldMessage( + pollInterval: Duration, client: KafkaClient, + source: Producer.Source, logger: Logger ) + /// Kill the poll loop. + case killPollLoop } /// Returns the next action to be taken when wanting to poll. /// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken. /// /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. - func nextPollAction() -> PollAction? { + func nextPollLoopAction() -> PollLoopAction { switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .consuming(let pollTimeout, let client, _, let logger): - return .pollForMessage(pollTimeout: pollTimeout, client: client, logger: logger) + case .consuming(let pollInterval, let client, let source, _, let logger): + return .pollForAndYieldMessage(pollInterval: pollInterval, client: client, source: source, logger: logger) case .finished: - return nil + return .killPollLoop } } @@ -382,13 +459,23 @@ extension KafkaConsumer { } /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. + /// + /// - Parameter pollInterval: Amount of time between two subsequent polls invocations. + /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. + /// - Parameter logger: A logger. + /// /// - Returns: The action to be taken. mutating func setUpConnection() -> SetUpConnectionAction { switch self.state { - case .initializing(let pollTimeout, let client, let subscribedTopicsPointer, let logger): + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") + case .initializing(let pollInterval, let client, let source, let subscribedTopicsPointer, let logger): self.state = .consuming( - pollTimeout: pollTimeout, + pollInterval: pollInterval, client: client, + source: source, subscribedTopicsPointer: subscribedTopicsPointer, logger: logger ) @@ -416,9 +503,11 @@ extension KafkaConsumer { /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. func commitSync() -> CommitSyncAction { switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consuming(_, let client, _, _): + case .consuming(_, let client, _, _, _): return .commitSync(client: client) case .finished: return .throwClosedError @@ -427,13 +516,15 @@ extension KafkaConsumer { /// Action to be taken when wanting to do close the consumer. enum FinishAction { - /// Shut down the ``KafkaConsumer``. + /// Shut down the ``KafkaConsumer`` and finish the given `source` object. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. + /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. /// - Parameter logger: A logger. - case shutdownGracefully( + case shutdownGracefullyAndFinishSource( client: KafkaClient, + source: Producer.Source, subscribedTopicsPointer: UnsafeMutablePointer, logger: Logger ) @@ -445,11 +536,18 @@ extension KafkaConsumer { /// - Important: This function throws a `fatalError` if called while in the `.initializing` state. mutating func finish() -> FinishAction? { switch self.state { + case .uninitialized: + fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(_, let client, let subscribedTopicsPointer, let logger): + case .consuming(_, let client, let source, let subscribedTopicsPointer, let logger): self.state = .finished - return .shutdownGracefully(client: client, subscribedTopicsPointer: subscribedTopicsPointer, logger: logger) + return .shutdownGracefullyAndFinishSource( + client: client, + source: source, + subscribedTopicsPointer: subscribedTopicsPointer, + logger: logger + ) case .finished: return nil } diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index f72ba92f..aee3b303 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -104,6 +104,11 @@ final class SwiftKafkaTests: XCTestCase { await producer.shutdownGracefully() } + // Consumer Run Task + group.addTask { + try await consumer.run() + } + // Consumer Task group.addTask { var consumedMessages = [KafkaConsumerMessage]() @@ -165,6 +170,11 @@ final class SwiftKafkaTests: XCTestCase { await producer.shutdownGracefully() } + // Consumer Run Task + group.addTask { + try await consumer.run() + } + // Consumer Task group.addTask { var consumedMessages = [KafkaConsumerMessage]() @@ -223,6 +233,11 @@ final class SwiftKafkaTests: XCTestCase { await producer.shutdownGracefully() } + // Consumer Run Task + group.addTask { + try await consumer.run() + } + // Consumer Task group.addTask { var consumedMessages = [KafkaConsumerMessage]() From fa9ebf46967508a775ef73176a8c399d0bdac4ad Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 26 Jun 2023 11:59:57 +0100 Subject: [PATCH 08/12] Review Franz Modifications: * `KafkaClient.consumerPoll`: remove `timeout` as this method should only be used with `timeout = 0` * `KafkaClient.consumerPoll`: remove continuation as it has no benefit --- Sources/SwiftKafka/KafkaClient.swift | 41 ++++++++++---------------- Sources/SwiftKafka/KafkaConsumer.swift | 2 +- 2 files changed, 16 insertions(+), 27 deletions(-) diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index 8a2fed56..564de191 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -58,39 +58,28 @@ final class KafkaClient { /// Request a new message from the Kafka cluster. /// - /// This method blocks for a maximum of `timeout` milliseconds. - /// /// - Important: This method should only be invoked from ``KafkaConsumer``. /// - /// - Parameter timeout: Maximum amount of milliseconds this method waits for a new message. /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. - func consumerPoll(timeout: Int32) async throws -> KafkaConsumerMessage? { - try await withCheckedThrowingContinuation { continuation in - guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, timeout) else { - // No error, there might be no more messages - continuation.resume(returning: nil) - return - } - - defer { - // Destroy message otherwise poll() will block forever - rd_kafka_message_destroy(messagePointer) - } + func consumerPoll() throws -> KafkaConsumerMessage? { + guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else { + // No error, there might be no more messages + return nil + } - // Reached the end of the topic+partition queue on the broker - if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF { - continuation.resume(returning: nil) - return - } + defer { + // Destroy message otherwise poll() will block forever + rd_kafka_message_destroy(messagePointer) + } - do { - let message = try KafkaConsumerMessage(messagePointer: messagePointer) - continuation.resume(returning: message) - } catch { - continuation.resume(throwing: error) - } + // Reached the end of the topic+partition queue on the broker + if messagePointer.pointee.err == RD_KAFKA_RESP_ERR__PARTITION_EOF { + return nil } + + let message = try KafkaConsumerMessage(messagePointer: messagePointer) + return message } /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index ae1742c9..b0f8a46d 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -235,7 +235,7 @@ public final class KafkaConsumer { case .pollForAndYieldMessage(let pollInterval, let client, let source, let logger): let messageResult: Result do { - guard let message = try await client.consumerPoll(timeout: 0) else { + guard let message = try client.consumerPoll() else { break } messageResult = .success(message) From b20ef1593e7c026862104ea557ab3a9852f78d2f Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 26 Jun 2023 15:30:13 +0100 Subject: [PATCH 09/12] Review Franz Modifications: * make `KafkaConsumer.shutdownGracefully()` `private` * `KafkaConsumer`: create `librdkafka` wrapping methods in `KafkaClient` instead of using `KafkaClient.withKafkaHandlePointer` * `KafkaConsumer`: rename `.killPollLoop` -> `.terminatePollLoop` * `KafkaConsumer.StateMachine`: move `logger` out of `State` * `KafkaConsumer`: move `pollInterval` out of `StateMachine` --- Sources/SwiftKafka/KafkaClient.swift | 44 +++++++ Sources/SwiftKafka/KafkaConsumer.swift | 152 ++++++++++--------------- 2 files changed, 103 insertions(+), 93 deletions(-) diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index 564de191..fe28a3ee 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -15,6 +15,8 @@ import Crdkafka import Logging +// TODO: move to RD namespace + rename? + /// Base class for ``KafkaProducer`` and ``KafkaConsumer``, /// which is used to handle the connection to the Kafka ecosystem. final class KafkaClient { @@ -56,6 +58,20 @@ final class KafkaClient { return rd_kafka_poll(self.kafkaHandle, timeout) } + /// Redirect the main ``KafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s + /// queue (``KafkaClient/consumerPoll``). + /// + /// Events that would be triggered by ``KafkaClient/poll(timeout:)`` + /// are now triggered by ``KafkaClient/consumerPoll``. + /// + /// - Warning: It is not allowed to call ``KafkaClient/poll(timeout:)`` after ``KafkaClient/pollSetConsumer``. + func pollSetConsumer() throws { + let result = rd_kafka_poll_set_consumer(self.kafkaHandle) + if result != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: result) + } + } + /// Request a new message from the Kafka cluster. /// /// - Important: This method should only be invoked from ``KafkaConsumer``. @@ -82,6 +98,34 @@ final class KafkaClient { return message } + // TODO: subscribed topics pointer live inside of client? + + /// Subscribe to topic set using balanced consumer groups. + /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. + func subscribe(subscribedTopicsPointer: UnsafeMutablePointer) throws { + let result = rd_kafka_subscribe(self.kafkaHandle, subscribedTopicsPointer) + if result != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: result) + } + } + + /// Atomic assignment of partitions to consume. + /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. + func assign(subscribedTopicsPointer: UnsafeMutablePointer) throws { + let result = rd_kafka_assign(self.kafkaHandle, subscribedTopicsPointer) + if result != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: result) + } + } + + /// Close the consumer. + func consumerClose() throws { + let result = rd_kafka_consumer_close(self.kafkaHandle) + if result != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: result) + } + } + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index b0f8a46d..be471629 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -43,20 +43,22 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { // Duplicate of _shutdownGracefully let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefullyAndFinishSource(let client, let source, let subscribedTopicsPointer, let logger): + case .shutdownGracefullyAndFinishSource(let client, let source, let subscribedTopicsPointer): source.finish() - let result = client.withKafkaHandlePointer { handle in - rd_kafka_consumer_close(handle) + do { + try client.consumerClose() + } catch { + self.stateMachine.withLockedValue { + if let error = error as? KafkaError { + $0.logger.error("Closing KafkaConsumer failed: \(error.description)") + } else { + $0.logger.error("Caught unknown error: \(error)") + } + } } rd_kafka_topic_partition_list_destroy(subscribedTopicsPointer) - - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - let error = KafkaError.rdKafkaError(wrapping: result) - logger.error("Closing KafkaConsumer failed: \(error.description)") - return - } case .none: return } @@ -94,8 +96,12 @@ public final class KafkaConsumer { NoBackPressure, ShutdownOnTerminate > + /// Time between two consecutive polls. + private var pollInterval: Duration /// The configuration object of the consumer client. private var config: KafkaConsumerConfiguration + /// A logger. + private let logger: Logger /// State of the `KafkaConsumer`. private let stateMachine: NIOLockedValueBox @@ -113,7 +119,9 @@ public final class KafkaConsumer { config: KafkaConsumerConfiguration, logger: Logger ) throws { + self.pollInterval = pollInterval self.config = config + self.logger = logger let client = try RDKafka.createClient(type: .consumer, configDictionary: config.dictionary, logger: logger) @@ -121,7 +129,7 @@ public final class KafkaConsumer { throw KafkaError.client(reason: "Failed to allocate Topic+Partition list.") } - self.stateMachine = NIOLockedValueBox(StateMachine()) + self.stateMachine = NIOLockedValueBox(StateMachine(logger: self.logger)) let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( elementType: Result.self, @@ -135,22 +143,15 @@ public final class KafkaConsumer { self.stateMachine.withLockedValue { $0.initialize( - pollInterval: pollInterval, client: client, source: sourceAndSequence.source, - subscribedTopicsPointer: subscribedTopicsPointer, - logger: logger + subscribedTopicsPointer: subscribedTopicsPointer ) } - // Events that would be triggered by rd_kafka_poll - // will now be also triggered by rd_kafka_consumer_poll - let result = client.withKafkaHandlePointer { handle in - rd_kafka_poll_set_consumer(handle) - } - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: result) - } + // Events that would be triggered by ``KafkaClient/poll(timeout:)`` + // are now triggered by ``KafkaClient/consumerPoll``. + try client.pollSetConsumer() switch config.consumptionStrategy._internal { case .partition(topic: let topic, partition: let partition, offset: let offset): @@ -161,7 +162,6 @@ public final class KafkaConsumer { } deinit { - // This occurs e.g. when for await loop is exited through break self.shutdownGracefully() } @@ -180,14 +180,7 @@ public final class KafkaConsumer { KafkaPartition.unassigned.rawValue ) } - - let result = client.withKafkaHandlePointer { handle in - rd_kafka_subscribe(handle, subscribedTopicsPointer) - } - - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: result) - } + try client.subscribe(subscribedTopicsPointer: subscribedTopicsPointer) } } @@ -211,16 +204,8 @@ public final class KafkaConsumer { ) else { fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") } - partitionPointer.pointee.offset = Int64(offset) - - let result = client.withKafkaHandlePointer { handle in - rd_kafka_assign(handle, subscribedTopicsPointer) - } - - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: result) - } + try client.assign(subscribedTopicsPointer: subscribedTopicsPointer) } } @@ -232,7 +217,7 @@ public final class KafkaConsumer { while !Task.isCancelled { let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } switch nextAction { - case .pollForAndYieldMessage(let pollInterval, let client, let source, let logger): + case .pollForAndYieldMessage(let client, let source): let messageResult: Result do { guard let message = try client.consumerPoll() else { @@ -242,13 +227,13 @@ public final class KafkaConsumer { } catch let kafkaError as KafkaError { messageResult = .failure(kafkaError) } catch { - logger.error("KafkaConsumer caught error: \(error)") + self.logger.error("KafkaConsumer caught error: \(error)") break } // We support no back pressure, we can ignore the yield result _ = source.yield(messageResult) - try await Task.sleep(for: pollInterval) - case .killPollLoop: + try await Task.sleep(for: self.pollInterval) + case .terminatePollLoop: return } } @@ -300,6 +285,7 @@ public final class KafkaConsumer { changesList, 0 ) // Blocks until commiting the offset is done + // -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68 } guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { throw KafkaError.rdKafkaError(wrapping: result) @@ -311,15 +297,15 @@ public final class KafkaConsumer { /// /// - Note: Invoking this function is not always needed as the ``KafkaConsumer`` /// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended. - public func shutdownGracefully() { + private func shutdownGracefully() { let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefullyAndFinishSource(let client, let source, let subscribedTopicsPointer, let logger): + case .shutdownGracefullyAndFinishSource(let client, let source, let subscribedTopicsPointer): self._shutdownGracefullyAndFinishSource( client: client, source: source, subscribedTopicsPointer: subscribedTopicsPointer, - logger: logger + logger: self.logger ) case .none: return @@ -334,17 +320,17 @@ public final class KafkaConsumer { ) { source.finish() - let result = client.withKafkaHandlePointer { handle in - rd_kafka_consumer_close(handle) + do { + try client.consumerClose() + } catch { + if let error = error as? KafkaError { + logger.error("Closing KafkaConsumer failed: \(error.description)") + } else { + logger.error("Caught unknown error: \(error)") + } } rd_kafka_topic_partition_list_destroy(subscribedTopicsPointer) - - guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { - let error = KafkaError.rdKafkaError(wrapping: result) - logger.error("Closing KafkaConsumer failed: \(error.description)") - return - } } } @@ -353,6 +339,9 @@ public final class KafkaConsumer { extension KafkaConsumer { /// State machine representing the state of the ``KafkaConsumer``. struct StateMachine { + /// A logger. + let logger: Logger + /// The state of the ``StateMachine``. enum State { /// The state machine has been initialized with init() but is not yet Initialized @@ -361,31 +350,23 @@ extension KafkaConsumer { /// We are in the process of initializing the ``KafkaConsumer``, /// though ``subscribe()`` / ``assign()`` have not been invoked. /// - /// - Parameter pollInterval: Amount of time between two subsequent polls invocations. /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - /// - Parameter logger: A logger. case initializing( - pollInterval: Duration, client: KafkaClient, source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer, - logger: Logger + subscribedTopicsPointer: UnsafeMutablePointer ) /// The ``KafkaConsumer`` is consuming messages. /// - /// - Parameter pollInterval: Amount of time between two subsequent polls invocations. /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - /// - Parameter logger: A logger. case consuming( - pollInterval: Duration, client: KafkaClient, source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer, - logger: Logger + subscribedTopicsPointer: UnsafeMutablePointer ) /// The ``KafkaConsumer`` has been closed. case finished @@ -397,21 +378,17 @@ extension KafkaConsumer { /// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are /// not yet available when the normal initialization occurs. mutating func initialize( - pollInterval: Duration, client: KafkaClient, source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer, - logger: Logger + subscribedTopicsPointer: UnsafeMutablePointer ) { guard case .uninitialized = self.state else { fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") } self.state = .initializing( - pollInterval: pollInterval, client: client, source: source, - subscribedTopicsPointer: subscribedTopicsPointer, - logger: logger + subscribedTopicsPointer: subscribedTopicsPointer ) } @@ -419,18 +396,14 @@ extension KafkaConsumer { enum PollLoopAction { /// Poll for a new ``KafkaConsumerMessage``. /// - /// - Parameter pollInterval: Amount of time between two subsequent polls invocations. /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - /// - Parameter logger: A logger. case pollForAndYieldMessage( - pollInterval: Duration, client: KafkaClient, - source: Producer.Source, - logger: Logger + source: Producer.Source ) - /// Kill the poll loop. - case killPollLoop + /// Terminate the poll loop. + case terminatePollLoop } /// Returns the next action to be taken when wanting to poll. @@ -443,10 +416,10 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .consuming(let pollInterval, let client, let source, _, let logger): - return .pollForAndYieldMessage(pollInterval: pollInterval, client: client, source: source, logger: logger) + case .consuming(let client, let source, _): + return .pollForAndYieldMessage(client: client, source: source) case .finished: - return .killPollLoop + return .terminatePollLoop } } @@ -460,24 +433,20 @@ extension KafkaConsumer { /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. /// - /// - Parameter pollInterval: Amount of time between two subsequent polls invocations. /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - /// - Parameter logger: A logger. /// /// - Returns: The action to be taken. mutating func setUpConnection() -> SetUpConnectionAction { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing(let pollInterval, let client, let source, let subscribedTopicsPointer, let logger): + case .initializing(let client, let source, let subscribedTopicsPointer): self.state = .consuming( - pollInterval: pollInterval, client: client, source: source, - subscribedTopicsPointer: subscribedTopicsPointer, - logger: logger + subscribedTopicsPointer: subscribedTopicsPointer ) return .setUpConnection(client: client, subscribedTopicsPointer: subscribedTopicsPointer) case .consuming, .finished: @@ -507,7 +476,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consuming(_, let client, _, _, _): + case .consuming(let client, _, _): return .commitSync(client: client) case .finished: return .throwClosedError @@ -521,12 +490,10 @@ extension KafkaConsumer { /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - /// - Parameter logger: A logger. case shutdownGracefullyAndFinishSource( client: KafkaClient, source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer, - logger: Logger + subscribedTopicsPointer: UnsafeMutablePointer ) } @@ -540,13 +507,12 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(_, let client, let source, let subscribedTopicsPointer, let logger): + case .consuming(let client, let source, let subscribedTopicsPointer): self.state = .finished return .shutdownGracefullyAndFinishSource( client: client, source: source, - subscribedTopicsPointer: subscribedTopicsPointer, - logger: logger + subscribedTopicsPointer: subscribedTopicsPointer ) case .finished: return nil From 4b2dce6241340092f60e63a182fd5f25fc80d119 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 26 Jun 2023 17:54:28 +0100 Subject: [PATCH 10/12] Wrapper type for rd_kafka_topic_partition_list_t Modifications: * create new class `RDKafkaTopicPartitionList` wrapping a `rd_kafka_topic_partition_list_t` --- Sources/SwiftKafka/KafkaClient.swift | 24 +++--- Sources/SwiftKafka/KafkaConsumer.swift | 81 +++++++------------ .../RDKafka/RDKafkaTopicPartitionList.swift | 60 ++++++++++++++ 3 files changed, 101 insertions(+), 64 deletions(-) create mode 100644 Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index fe28a3ee..d5b09033 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -15,8 +15,6 @@ import Crdkafka import Logging -// TODO: move to RD namespace + rename? - /// Base class for ``KafkaProducer`` and ``KafkaConsumer``, /// which is used to handle the connection to the Kafka ecosystem. final class KafkaClient { @@ -98,23 +96,25 @@ final class KafkaClient { return message } - // TODO: subscribed topics pointer live inside of client? - /// Subscribe to topic set using balanced consumer groups. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - func subscribe(subscribedTopicsPointer: UnsafeMutablePointer) throws { - let result = rd_kafka_subscribe(self.kafkaHandle, subscribedTopicsPointer) - if result != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: result) + func subscribe(subscribedTopicsPointer: RDKafkaTopicPartitionList) throws { + try subscribedTopicsPointer.withListPointer { pointer in + let result = rd_kafka_subscribe(self.kafkaHandle, pointer) + if result != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: result) + } } } /// Atomic assignment of partitions to consume. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - func assign(subscribedTopicsPointer: UnsafeMutablePointer) throws { - let result = rd_kafka_assign(self.kafkaHandle, subscribedTopicsPointer) - if result != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: result) + func assign(subscribedTopicsPointer: RDKafkaTopicPartitionList) throws { + try subscribedTopicsPointer.withListPointer { pointer in + let result = rd_kafka_assign(self.kafkaHandle, pointer) + if result != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: result) + } } } diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index be471629..4ac07911 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -43,7 +43,7 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { // Duplicate of _shutdownGracefully let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefullyAndFinishSource(let client, let source, let subscribedTopicsPointer): + case .shutdownGracefullyAndFinishSource(let client, let source): source.finish() do { @@ -57,8 +57,6 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { } } } - - rd_kafka_topic_partition_list_destroy(subscribedTopicsPointer) case .none: return } @@ -125,10 +123,6 @@ public final class KafkaConsumer { let client = try RDKafka.createClient(type: .consumer, configDictionary: config.dictionary, logger: logger) - guard let subscribedTopicsPointer = rd_kafka_topic_partition_list_new(1) else { - throw KafkaError.client(reason: "Failed to allocate Topic+Partition list.") - } - self.stateMachine = NIOLockedValueBox(StateMachine(logger: self.logger)) let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( @@ -145,7 +139,7 @@ public final class KafkaConsumer { $0.initialize( client: client, source: sourceAndSequence.source, - subscribedTopicsPointer: subscribedTopicsPointer + subscribedTopicsPointer: RDKafkaTopicPartitionList() ) } @@ -174,10 +168,9 @@ public final class KafkaConsumer { switch action { case .setUpConnection(let client, let subscribedTopicsPointer): for topic in topics { - rd_kafka_topic_partition_list_add( - subscribedTopicsPointer, - topic, - KafkaPartition.unassigned.rawValue + subscribedTopicsPointer.add( + topic: topic, + partition: KafkaPartition.unassigned ) } try client.subscribe(subscribedTopicsPointer: subscribedTopicsPointer) @@ -197,14 +190,7 @@ public final class KafkaConsumer { let action = self.stateMachine.withLockedValue { $0.setUpConnection() } switch action { case .setUpConnection(let client, let subscribedTopicsPointer): - guard let partitionPointer = rd_kafka_topic_partition_list_add( - subscribedTopicsPointer, - topic, - partition.rawValue - ) else { - fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") - } - partitionPointer.pointee.offset = Int64(offset) + subscribedTopicsPointer.setOffset(topic: topic, partition: partition, offset: Int64(offset)) try client.assign(subscribedTopicsPointer: subscribedTopicsPointer) } } @@ -265,27 +251,25 @@ public final class KafkaConsumer { throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false") } - let changesList = rd_kafka_topic_partition_list_new(1) - defer { rd_kafka_topic_partition_list_destroy(changesList) } - guard let partitionPointer = rd_kafka_topic_partition_list_add( - changesList, - message.topic, - message.partition.rawValue - ) else { - fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") - } - // The offset committed is always the offset of the next requested message. // Thus, we increase the offset of the current message by one before committing it. // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 - partitionPointer.pointee.offset = Int64(message.offset + 1) + let changesList = RDKafkaTopicPartitionList() + changesList.setOffset( + topic: message.topic, + partition: message.partition, + offset: Int64(message.offset + 1) + ) + let result = client.withKafkaHandlePointer { handle in - rd_kafka_commit( - handle, - changesList, - 0 - ) // Blocks until commiting the offset is done - // -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68 + changesList.withListPointer { listPointer in + rd_kafka_commit( + handle, + listPointer, + 0 + ) // Blocks until commiting the offset is done + // -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68 + } } guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { throw KafkaError.rdKafkaError(wrapping: result) @@ -300,11 +284,10 @@ public final class KafkaConsumer { private func shutdownGracefully() { let action = self.stateMachine.withLockedValue { $0.finish() } switch action { - case .shutdownGracefullyAndFinishSource(let client, let source, let subscribedTopicsPointer): + case .shutdownGracefullyAndFinishSource(let client, let source): self._shutdownGracefullyAndFinishSource( client: client, source: source, - subscribedTopicsPointer: subscribedTopicsPointer, logger: self.logger ) case .none: @@ -315,7 +298,6 @@ public final class KafkaConsumer { private func _shutdownGracefullyAndFinishSource( client: KafkaClient, source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer, logger: Logger ) { source.finish() @@ -329,8 +311,6 @@ public final class KafkaConsumer { logger.error("Caught unknown error: \(error)") } } - - rd_kafka_topic_partition_list_destroy(subscribedTopicsPointer) } } @@ -356,7 +336,7 @@ extension KafkaConsumer { case initializing( client: KafkaClient, source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer + subscribedTopicsPointer: RDKafkaTopicPartitionList ) /// The ``KafkaConsumer`` is consuming messages. /// @@ -366,7 +346,7 @@ extension KafkaConsumer { case consuming( client: KafkaClient, source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer + subscribedTopicsPointer: RDKafkaTopicPartitionList ) /// The ``KafkaConsumer`` has been closed. case finished @@ -380,7 +360,7 @@ extension KafkaConsumer { mutating func initialize( client: KafkaClient, source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer + subscribedTopicsPointer: RDKafkaTopicPartitionList ) { guard case .uninitialized = self.state else { fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") @@ -428,7 +408,7 @@ extension KafkaConsumer { /// Set up the connection through ``subscribe()`` or ``assign()``. /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - case setUpConnection(client: KafkaClient, subscribedTopicsPointer: UnsafeMutablePointer) + case setUpConnection(client: KafkaClient, subscribedTopicsPointer: RDKafkaTopicPartitionList) } /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. @@ -489,11 +469,9 @@ extension KafkaConsumer { /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. case shutdownGracefullyAndFinishSource( client: KafkaClient, - source: Producer.Source, - subscribedTopicsPointer: UnsafeMutablePointer + source: Producer.Source ) } @@ -507,12 +485,11 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(let client, let source, let subscribedTopicsPointer): + case .consuming(let client, let source, _): self.state = .finished return .shutdownGracefullyAndFinishSource( client: client, - source: source, - subscribedTopicsPointer: subscribedTopicsPointer + source: source ) case .finished: return nil diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift new file mode 100644 index 00000000..ede8c3e2 --- /dev/null +++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift @@ -0,0 +1,60 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka + +/// Swift wrapper type for `rd_kafka_topic_partition_list_t`. +class RDKafkaTopicPartitionList { + private let _internal: UnsafeMutablePointer + + /// Create a new topic+partition list. + /// + /// - Parameter size: Initial allocated size used when the number of allocated elements can be estimated. + init(size: Int32 = 1) { + self._internal = rd_kafka_topic_partition_list_new(size) + } + + deinit { + rd_kafka_topic_partition_list_destroy(self._internal) + } + + /// Add topic+partition pair to list. + func add(topic: String, partition: KafkaPartition) { + rd_kafka_topic_partition_list_add( + self._internal, + topic, + partition.rawValue + ) + } + + /// Manually set read offset for a given topic+partition pair. + func setOffset(topic: String, partition: KafkaPartition, offset: Int64) { + guard let partitionPointer = rd_kafka_topic_partition_list_add( + self._internal, + topic, + partition.rawValue + ) else { + fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") + } + partitionPointer.pointee.offset = offset + } + + /// Scoped accessor that enables safe access to the pointer of the underlying `rd_kafka_topic_partition_t`. + /// - Warning: Do not escape the pointer from the closure for later use. + /// - Parameter body: The closure will use the pointer. + @discardableResult + func withListPointer(_ body: (UnsafeMutablePointer) throws -> T) rethrows -> T { + return try body(self._internal) + } +} From a7ba78afc84c6a2b5f18caf971f66c230ef05f57 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Tue, 27 Jun 2023 14:23:10 +0100 Subject: [PATCH 11/12] KafkaConsumer: Remove unneccesary references to subscribedTopicsPointer --- Sources/SwiftKafka/KafkaClient.swift | 12 +++--- Sources/SwiftKafka/KafkaConsumer.swift | 51 ++++++++++---------------- 2 files changed, 26 insertions(+), 37 deletions(-) diff --git a/Sources/SwiftKafka/KafkaClient.swift b/Sources/SwiftKafka/KafkaClient.swift index d5b09033..ff1b84d5 100644 --- a/Sources/SwiftKafka/KafkaClient.swift +++ b/Sources/SwiftKafka/KafkaClient.swift @@ -97,9 +97,9 @@ final class KafkaClient { } /// Subscribe to topic set using balanced consumer groups. - /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - func subscribe(subscribedTopicsPointer: RDKafkaTopicPartitionList) throws { - try subscribedTopicsPointer.withListPointer { pointer in + /// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs. + func subscribe(topicPartitionList: RDKafkaTopicPartitionList) throws { + try topicPartitionList.withListPointer { pointer in let result = rd_kafka_subscribe(self.kafkaHandle, pointer) if result != RD_KAFKA_RESP_ERR_NO_ERROR { throw KafkaError.rdKafkaError(wrapping: result) @@ -108,9 +108,9 @@ final class KafkaClient { } /// Atomic assignment of partitions to consume. - /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - func assign(subscribedTopicsPointer: RDKafkaTopicPartitionList) throws { - try subscribedTopicsPointer.withListPointer { pointer in + /// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs. + func assign(topicPartitionList: RDKafkaTopicPartitionList) throws { + try topicPartitionList.withListPointer { pointer in let result = rd_kafka_assign(self.kafkaHandle, pointer) if result != RD_KAFKA_RESP_ERR_NO_ERROR { throw KafkaError.rdKafkaError(wrapping: result) diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index 4ac07911..9b6abed5 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -138,8 +138,7 @@ public final class KafkaConsumer { self.stateMachine.withLockedValue { $0.initialize( client: client, - source: sourceAndSequence.source, - subscribedTopicsPointer: RDKafkaTopicPartitionList() + source: sourceAndSequence.source ) } @@ -166,14 +165,15 @@ public final class KafkaConsumer { private func subscribe(topics: [String]) throws { let action = self.stateMachine.withLockedValue { $0.setUpConnection() } switch action { - case .setUpConnection(let client, let subscribedTopicsPointer): + case .setUpConnection(let client): + let subscription = RDKafkaTopicPartitionList() for topic in topics { - subscribedTopicsPointer.add( + subscription.add( topic: topic, partition: KafkaPartition.unassigned ) } - try client.subscribe(subscribedTopicsPointer: subscribedTopicsPointer) + try client.subscribe(topicPartitionList: subscription) } } @@ -189,9 +189,10 @@ public final class KafkaConsumer { ) throws { let action = self.stateMachine.withLockedValue { $0.setUpConnection() } switch action { - case .setUpConnection(let client, let subscribedTopicsPointer): - subscribedTopicsPointer.setOffset(topic: topic, partition: partition, offset: Int64(offset)) - try client.assign(subscribedTopicsPointer: subscribedTopicsPointer) + case .setUpConnection(let client): + let assignment = RDKafkaTopicPartitionList() + assignment.setOffset(topic: topic, partition: partition, offset: Int64(offset)) + try client.assign(topicPartitionList: assignment) } } @@ -332,21 +333,17 @@ extension KafkaConsumer { /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. case initializing( client: KafkaClient, - source: Producer.Source, - subscribedTopicsPointer: RDKafkaTopicPartitionList + source: Producer.Source ) /// The ``KafkaConsumer`` is consuming messages. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. case consuming( client: KafkaClient, - source: Producer.Source, - subscribedTopicsPointer: RDKafkaTopicPartitionList + source: Producer.Source ) /// The ``KafkaConsumer`` has been closed. case finished @@ -359,16 +356,14 @@ extension KafkaConsumer { /// not yet available when the normal initialization occurs. mutating func initialize( client: KafkaClient, - source: Producer.Source, - subscribedTopicsPointer: RDKafkaTopicPartitionList + source: Producer.Source ) { guard case .uninitialized = self.state else { fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") } self.state = .initializing( client: client, - source: source, - subscribedTopicsPointer: subscribedTopicsPointer + source: source ) } @@ -396,7 +391,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .consuming(let client, let source, _): + case .consuming(let client, let source): return .pollForAndYieldMessage(client: client, source: source) case .finished: return .terminatePollLoop @@ -407,28 +402,22 @@ extension KafkaConsumer { enum SetUpConnectionAction { /// Set up the connection through ``subscribe()`` or ``assign()``. /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - case setUpConnection(client: KafkaClient, subscribedTopicsPointer: RDKafkaTopicPartitionList) + case setUpConnection(client: KafkaClient) } /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. - /// /// - Returns: The action to be taken. mutating func setUpConnection() -> SetUpConnectionAction { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing(let client, let source, let subscribedTopicsPointer): + case .initializing(let client, let source): self.state = .consuming( client: client, - source: source, - subscribedTopicsPointer: subscribedTopicsPointer + source: source ) - return .setUpConnection(client: client, subscribedTopicsPointer: subscribedTopicsPointer) + return .setUpConnection(client: client) case .consuming, .finished: fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer") } @@ -456,7 +445,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") - case .consuming(let client, _, _): + case .consuming(let client, _): return .commitSync(client: client) case .finished: return .throwClosedError @@ -485,7 +474,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(let client, let source, _): + case .consuming(let client, let source): self.state = .finished return .shutdownGracefullyAndFinishSource( client: client, From 5b268e758293ed76dbe2ed2c1a31b5aa93590dbe Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Tue, 27 Jun 2023 15:27:41 +0100 Subject: [PATCH 12/12] Review Franz Modifications: * make `RDKafkaTopicPartitionList` a `final class` * make `KafkaConsumer.messages` `AsyncSequence` return `KafkaConsumerMessage` instead of `Result<,>` type * `KafkaConsumer` fails immediately when `rd_kafka_message_t` with error payload is received * move `KafkaConsumer.pollInterval` to `KafkaConsumerConfiguration` * move `KafkaProducer.pollInterval` to `KafkaProducerConfiguration` * update `README` --- README.md | 30 +++++-------------- .../KafkaConsumerConfiguration.swift | 6 ++++ .../KafkaProducerConfiguration.swift | 11 +++++++ Sources/SwiftKafka/KafkaConsumer.swift | 25 +++++----------- Sources/SwiftKafka/KafkaProducer.swift | 18 +++++++---- .../RDKafka/RDKafkaTopicPartitionList.swift | 2 +- Tests/IntegrationTests/SwiftKafkaTests.swift | 6 ++-- 7 files changed, 49 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index 0a5dd953..900c1322 100644 --- a/README.md +++ b/README.md @@ -69,13 +69,8 @@ await withThrowingTaskGroup(of: Void.self) { group in // Task receiving messages group.addTask { - for await messageResult in consumer.messages { - switch messageResult { - case .success(let message): - // Do something with message - case .failure(let error): - // Handle error - } + for await message in consumer.messages { + // Do something with message } } } @@ -105,13 +100,8 @@ await withThrowingTaskGroup(of: Void.self) { group in // Task receiving messages group.addTask { - for await messageResult in consumer.messages { - switch messageResult { - case .success(let message): - // Do something with message - case .failure(let error): - // Handle error - } + for await message in consumer.messages { + // Do something with message } } } @@ -142,14 +132,10 @@ await withThrowingTaskGroup(of: Void.self) { group in // Task receiving messages group.addTask { - for await messageResult in consumer.messages { - switch messageResult { - case .success(let message): - // Do something with message - try await consumer.commitSync(message) - case .failure(let error): - // Handle error - } + for await message in consumer.messages { + // Do something with message + // ... + try await consumer.commitSync(message) } } } diff --git a/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift index 1ca33e64..535d8a42 100644 --- a/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift @@ -18,6 +18,10 @@ import struct Foundation.UUID public struct KafkaConsumerConfiguration: Hashable { // MARK: - SwiftKafka-specific Config properties + /// The time between two consecutive polls. + /// Effectively controls the rate at which incoming events and messages are consumed. + public var pollInterval: Duration + // This backs the consumptionStrategy computed property. private var _consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy @@ -329,6 +333,7 @@ public struct KafkaConsumerConfiguration: Hashable { } public init( + pollInterval: Duration = .milliseconds(100), consumptionStrategy: KafkaSharedConfiguration.ConsumptionStrategy, sessionTimeoutMs: UInt = 45000, heartbeatIntervalMs: UInt = 3000, @@ -374,6 +379,7 @@ public struct KafkaConsumerConfiguration: Hashable { saslUsername: String? = nil, saslPassword: String? = nil ) { + self.pollInterval = pollInterval self._consumptionStrategy = consumptionStrategy self.consumptionStrategy = consumptionStrategy // used to invoke set { } method diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift index 53afca9f..9ff1c3f1 100644 --- a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift +++ b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift @@ -13,6 +13,14 @@ //===----------------------------------------------------------------------===// public struct KafkaProducerConfiguration: Hashable { + // MARK: - SwiftKafka-specific Config properties + + /// The time between two consecutive polls. + /// Effectively controls the rate at which incoming events and acknowledgements are consumed. + public var pollInterval: Duration + + // MARK: - librdkafka Config properties + var dictionary: [String: String] = [:] // MARK: - Producer-specific Config Properties @@ -296,6 +304,7 @@ public struct KafkaProducerConfiguration: Hashable { } public init( + pollInterval: Duration = .milliseconds(100), transactionalID: String = "", transactionalTimeoutMs: UInt = 60000, enableIdempotence: Bool = false, @@ -340,6 +349,8 @@ public struct KafkaProducerConfiguration: Hashable { saslUsername: String? = nil, saslPassword: String? = nil ) { + self.pollInterval = pollInterval + self.transactionalID = transactionalID self.transactionTimeoutMs = transactionalTimeoutMs self.enableIdempotence = enableIdempotence diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index 9b6abed5..ca908380 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -67,7 +67,7 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate { /// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct KafkaConsumerMessages: AsyncSequence { - public typealias Element = Result + public typealias Element = KafkaConsumerMessage typealias WrappedSequence = NIOAsyncSequenceProducer let wrappedSequence: WrappedSequence @@ -90,12 +90,10 @@ public struct KafkaConsumerMessages: AsyncSequence { /// Receive messages from the Kafka cluster. public final class KafkaConsumer { typealias Producer = NIOAsyncSequenceProducer< - Result, + KafkaConsumerMessage, NoBackPressure, ShutdownOnTerminate > - /// Time between two consecutive polls. - private var pollInterval: Duration /// The configuration object of the consumer client. private var config: KafkaConsumerConfiguration /// A logger. @@ -113,11 +111,9 @@ public final class KafkaConsumer { /// - Parameter logger: A logger. /// - Throws: A ``KafkaError`` if the initialization failed. public init( - pollInterval: Duration = .milliseconds(100), // TODO(felix): poll intervals through config in separate PR config: KafkaConsumerConfiguration, logger: Logger ) throws { - self.pollInterval = pollInterval self.config = config self.logger = logger @@ -126,7 +122,7 @@ public final class KafkaConsumer { self.stateMachine = NIOLockedValueBox(StateMachine(logger: self.logger)) let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( - elementType: Result.self, + elementType: KafkaConsumerMessage.self, backPressureStrategy: NoBackPressure(), delegate: ShutdownOnTerminate(stateMachine: self.stateMachine) ) @@ -200,26 +196,21 @@ public final class KafkaConsumer { /// /// - Returns: An awaitable task representing the execution of the poll loop. public func run() async throws { - // TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle) while !Task.isCancelled { let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } switch nextAction { case .pollForAndYieldMessage(let client, let source): - let messageResult: Result do { guard let message = try client.consumerPoll() else { break } - messageResult = .success(message) - } catch let kafkaError as KafkaError { - messageResult = .failure(kafkaError) + // We do not support back pressure, we can ignore the yield result + _ = source.yield(message) } catch { - self.logger.error("KafkaConsumer caught error: \(error)") - break + source.finish() + throw error } - // We support no back pressure, we can ignore the yield result - _ = source.yield(messageResult) - try await Task.sleep(for: self.pollInterval) + try await Task.sleep(for: self.config.pollInterval) case .terminatePollLoop: return } diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 7923fd32..f34f667f 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -57,6 +57,8 @@ public actor KafkaProducer { /// Counter that is used to assign each message a unique ID. /// Every time a new message is sent to the Kafka cluster, the counter is increased by one. private var messageIDCounter: UInt = 0 + /// The configuration object of the producer client. + private var config: KafkaProducerConfiguration /// The ``TopicConfiguration`` used for newly created topics. private let topicConfig: KafkaTopicConfiguration /// A logger. @@ -69,16 +71,20 @@ public actor KafkaProducer { // Private initializer, use factory methods to create KafkaProducer /// Initialize a new ``KafkaProducer``. + /// + /// - Parameter client: The ``KafkaClient`` instance associated with the ``KafkaProducer``. /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. /// - Parameter logger: A logger. /// - Throws: A ``KafkaError`` if initializing the producer failed. private init( client: KafkaClient, + config: KafkaProducerConfiguration, topicConfig: KafkaTopicConfiguration, logger: Logger ) async throws { self.client = client + self.config = config self.topicConfig = topicConfig self.topicHandles = [:] self.logger = logger @@ -89,8 +95,8 @@ public actor KafkaProducer { /// /// This factory method creates a producer without message acknowledgements. /// - /// - Parameter configuration: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. - /// - Parameter topicConfiguration: The ``KafkaTopicConfiguration`` used for newly created topics. + /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. /// - Parameter logger: A logger. /// - Returns: The newly created ``KafkaProducer``. /// - Throws: A ``KafkaError`` if initializing the producer failed. @@ -110,6 +116,7 @@ public actor KafkaProducer { let producer = try await KafkaProducer( client: client, + config: config, topicConfig: topicConfig, logger: logger ) @@ -156,6 +163,7 @@ public actor KafkaProducer { let producer = try await KafkaProducer( client: client, + config: config, topicConfig: topicConfig, logger: logger ) @@ -203,13 +211,11 @@ public actor KafkaProducer { /// Start polling Kafka for acknowledged messages. /// - /// - Parameter pollInterval: The desired time interval between two consecutive polls. /// - Returns: An awaitable task representing the execution of the poll loop. - public func run(pollInterval: Duration = .milliseconds(100)) async throws { - // TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle) + public func run() async throws { while self.state == .started { self.client.poll(timeout: 0) - try await Task.sleep(for: pollInterval) + try await Task.sleep(for: self.config.pollInterval) } } diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift index ede8c3e2..b1a5a813 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicPartitionList.swift @@ -15,7 +15,7 @@ import Crdkafka /// Swift wrapper type for `rd_kafka_topic_partition_list_t`. -class RDKafkaTopicPartitionList { +final class RDKafkaTopicPartitionList { private let _internal: UnsafeMutablePointer /// Create a new topic+partition list. diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index aee3b303..07e7f58f 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -113,7 +113,7 @@ final class SwiftKafkaTests: XCTestCase { group.addTask { var consumedMessages = [KafkaConsumerMessage]() for await messageResult in consumer.messages { - guard case .success(let message) = messageResult else { + guard case let message = messageResult else { continue } consumedMessages.append(message) @@ -179,7 +179,7 @@ final class SwiftKafkaTests: XCTestCase { group.addTask { var consumedMessages = [KafkaConsumerMessage]() for await messageResult in consumer.messages { - guard case .success(let message) = messageResult else { + guard case let message = messageResult else { continue } consumedMessages.append(message) @@ -242,7 +242,7 @@ final class SwiftKafkaTests: XCTestCase { group.addTask { var consumedMessages = [KafkaConsumerMessage]() for await messageResult in consumer.messages { - guard case .success(let message) = messageResult else { + guard case let message = messageResult else { continue } consumedMessages.append(message)