Skip to content

Commit de66afd

Browse files
committed
KafkaConsumer: change locality of message error
Motivation: If the `KafkaConsumer` receives an errorneous message, it should not throw in the `func run()` method but rather in the `KafkaConsumer.messages` `ThrowingSequence`. Modifications: * make `KafkaConsumerMessages` `AsyncSequence` wrap a sequence that yields a `Result<>` type -> `KafkaConsumerMessages` unwraps this result type and either yields or throws
1 parent d86522b commit de66afd

File tree

1 file changed

+21
-21
lines changed

1 file changed

+21
-21
lines changed

Sources/Kafka/KafkaConsumer.swift

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
6767
public typealias Element = KafkaConsumerMessage
6868
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
6969
typealias WrappedSequence = NIOThrowingAsyncSequenceProducer<
70-
Element,
70+
Result<KafkaConsumerMessage, Error>,
7171
Error,
7272
BackPressureStrategy,
7373
KafkaConsumerCloseOnTerminate
@@ -80,22 +80,28 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
8080
var wrappedIterator: WrappedSequence.AsyncIterator?
8181

8282
public mutating func next() async throws -> Element? {
83-
guard let element = try await self.wrappedIterator?.next() else {
83+
guard let result = try await self.wrappedIterator?.next() else {
8484
self.deallocateIterator()
8585
return nil
8686
}
8787

88-
let action = self.stateMachine.withLockedValue { $0.storeOffset() }
89-
switch action {
90-
case .storeOffset(let client):
91-
do {
92-
try client.storeMessageOffset(element)
93-
} catch {
94-
self.deallocateIterator()
95-
throw error
88+
switch result {
89+
case .success(let message):
90+
let action = self.stateMachine.withLockedValue { $0.storeOffset() }
91+
switch action {
92+
case .storeOffset(let client):
93+
do {
94+
try client.storeMessageOffset(message)
95+
} catch {
96+
self.deallocateIterator()
97+
throw error
98+
}
9699
}
100+
return message
101+
case .failure(let error):
102+
self.deallocateIterator()
103+
throw error
97104
}
98-
return element
99105
}
100106

101107
private mutating func deallocateIterator() {
@@ -116,7 +122,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
116122
/// A ``KafkaConsumer `` can be used to consume messages from a Kafka cluster.
117123
public final class KafkaConsumer: Sendable, Service {
118124
typealias Producer = NIOThrowingAsyncSequenceProducer<
119-
KafkaConsumerMessage,
125+
Result<KafkaConsumerMessage, Error>,
120126
Error,
121127
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
122128
KafkaConsumerCloseOnTerminate
@@ -153,7 +159,7 @@ public final class KafkaConsumer: Sendable, Service {
153159
self.logger = logger
154160

155161
let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
156-
elementType: KafkaConsumerMessage.self,
162+
elementType: Result<KafkaConsumerMessage, Error>.self,
157163
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
158164
delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine)
159165
)
@@ -330,14 +336,8 @@ public final class KafkaConsumer: Sendable, Service {
330336
for event in events {
331337
switch event {
332338
case .consumerMessages(let result):
333-
switch result {
334-
case .success(let message):
335-
// We do not support back pressure, we can ignore the yield result
336-
_ = source.yield(message)
337-
case .failure(let error):
338-
source.finish()
339-
throw error
340-
}
339+
// We do not support back pressure, we can ignore the yield result
340+
_ = source.yield(result)
341341
default:
342342
break // Ignore
343343
}

0 commit comments

Comments
 (0)