From 9f90a5bcf1fb4a075db10bbf8c1721f423a9bf53 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Wed, 23 Aug 2023 22:28:35 +0100 Subject: [PATCH] `KafkaConsumer`: change locality of message error Motivation: If the `KafkaConsumer` receives an errorneous message, it should not throw in the `func run()` method but rather in the `KafkaConsumer.messages` `ThrowingSequence`. Modifications: * make `KafkaConsumerMessages` `AsyncSequence` wrap a sequence that yields a `Result<>` type -> `KafkaConsumerMessages` unwraps this result type and either yields or throws --- Sources/Kafka/KafkaConsumer.swift | 44 +++++++++++++++---------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index cf4daec9..12fe0dca 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -67,7 +67,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { public typealias Element = KafkaConsumerMessage typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure typealias WrappedSequence = NIOThrowingAsyncSequenceProducer< - Element, + Result, Error, BackPressureStrategy, KafkaConsumerCloseOnTerminate @@ -80,24 +80,30 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { var wrappedIterator: WrappedSequence.AsyncIterator? public mutating func next() async throws -> Element? { - guard let element = try await self.wrappedIterator?.next() else { + guard let result = try await self.wrappedIterator?.next() else { self.deallocateIterator() return nil } - let action = self.stateMachine.withLockedValue { $0.storeOffset() } - switch action { - case .storeOffset(let client): - do { - try client.storeMessageOffset(element) - } catch { + switch result { + case .success(let message): + let action = self.stateMachine.withLockedValue { $0.storeOffset() } + switch action { + case .storeOffset(let client): + do { + try client.storeMessageOffset(message) + } catch { + self.deallocateIterator() + throw error + } + return message + case .terminateConsumerSequence: self.deallocateIterator() - throw error + return nil } - return element - case .terminateConsumerSequence: + case .failure(let error): self.deallocateIterator() - return nil + throw error } } @@ -119,7 +125,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { /// A ``KafkaConsumer `` can be used to consume messages from a Kafka cluster. public final class KafkaConsumer: Sendable, Service { typealias Producer = NIOThrowingAsyncSequenceProducer< - KafkaConsumerMessage, + Result, Error, NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure, KafkaConsumerCloseOnTerminate @@ -156,7 +162,7 @@ public final class KafkaConsumer: Sendable, Service { self.logger = logger let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence( - elementType: KafkaConsumerMessage.self, + elementType: Result.self, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine) ) @@ -333,14 +339,8 @@ public final class KafkaConsumer: Sendable, Service { for event in events { switch event { case .consumerMessages(let result): - switch result { - case .success(let message): - // We do not support back pressure, we can ignore the yield result - _ = source.yield(message) - case .failure(let error): - source.finish() - throw error - } + // We do not support back pressure, we can ignore the yield result + _ = source.yield(result) default: break // Ignore }