- 
                Notifications
    You must be signed in to change notification settings 
- Fork 30
          KafkaConsumer Refactoring
          #66
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
  
    KafkaConsumer Refactoring
  
  #66
              Conversation
b6aa400    to
    0677dcf      
    Compare
  
            
          
                Sources/SwiftKafka/KafkaClient.swift
              
                Outdated
          
        
      | /// - Parameter timeout: Maximum amount of milliseconds this method waits for a new message. | ||
| /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. | ||
| /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. | ||
| func consumerPoll(timeout: Int32) async throws -> KafkaConsumerMessage? { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove the timeout here otherwise people might actually block. Additionally, we should remove the async from this method. I don't see a reason why we are creating a continuation here. We should be able to just call rd_kafka_consumer_poll and bridge the message into a Swift type, destroy the pointer and return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally makes sense!
Modifications: * `KafkaConsumer`: remove `NIOAsyncSequenceProducer` and use own implementation of `AsyncSequence` instead * write async wrapper method `KafkaClient.consumerPoll(timeout:)` * update memory leak tests for `KafkaConsumer` and `KafkaProducer` * update tests
Modifications: * replace `serialQueue` in `KafkaConsumer` with `StateMachine` that encapsulates all variables and can also be accessed from the `ConsumerMessagesAsyncSequence` * close `KafkaConsumer` when `for await` loop of `AsyncSequence` is exited * make `ConsumerMessagesAsyncIterator` a class-backed `struct` * make `KafkaConsumer.shutdownGracefully` `public`
Modifications: * add `run()` method to `KafkaConsumer` * rename `ConsumerMessagesAsyncSequence` to `KafkaConsumerMessages` * `KafkaConsumer`: put back `NIOAsyncSequenceProducer` * update `README` * use `Duration` type for `pollInterval`
Modifications: * `KafkaClient.consumerPoll`: remove `timeout` as this method should only be used with `timeout = 0` * `KafkaClient.consumerPoll`: remove continuation as it has no benefit
0677dcf    to
    fa9ebf4      
    Compare
  
    | /// | ||
| /// - Note: Invoking this function is not always needed as the ``KafkaConsumer`` | ||
| /// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended. | ||
| public func shutdownGracefully() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this method private and since we intend to use service lifecycle to do this in the run() method.
| let result = client.withKafkaHandlePointer { handle in | ||
| rd_kafka_subscribe(handle, subscribedTopicsPointer) | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all these places where we use withKafkaHandlePointer can we just add a method on the KafkaClient instead? At best that method also already does the conversion to the correct error.
| } | ||
| messageResult = .success(message) | ||
| } catch let kafkaError as KafkaError { | ||
| messageResult = .failure(kafkaError) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we fail polling shouldn't we shutdown everything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So consumerPoll throws when there is an error with the received message. However, I think we shouldn't close the entire consumer because of, e.g. a message with a faulty payload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think this is pretty bad. If we get a bad message something seriously went wrong. At least that is what I expect. Can you investigate when this could happen and document it here. IMO it is weird that our sequence contains a Result<>, I would like to get rid of that.
| try await withCheckedThrowingContinuation { continuation in | ||
| do { | ||
| try self._commitSync(message) | ||
| try self._commitSync(message) // Blocks until commiting the offset is done | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is problematic. We need to change that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| client: KafkaClient, | ||
| source: Producer.Source, | ||
| subscribedTopicsPointer: UnsafeMutablePointer<rd_kafka_topic_partition_list_t>, | ||
| logger: Logger | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not store the logger in here. We can just keep it in a separate prop since it is not state.
| pollInterval: Duration, | ||
| client: KafkaClient, | ||
| source: Producer.Source, | ||
| subscribedTopicsPointer: UnsafeMutablePointer<rd_kafka_topic_partition_list_t>, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should create an abstraction for this so we don't hold UnsafeMutablePointers all the time.
| /// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. | ||
| /// - Parameter logger: A logger. | ||
| case initializing( | ||
| pollInterval: Duration, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need to be in the state machine since it is a fixed configuration.
| /// Kill the poll loop. | ||
| case killPollLoop | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kill is a bit harsh. Can we call this terminate instead?
Modifications: * make `KafkaConsumer.shutdownGracefully()` `private` * `KafkaConsumer`: create `librdkafka` wrapping methods in `KafkaClient` instead of using `KafkaClient.withKafkaHandlePointer` * `KafkaConsumer`: rename `.killPollLoop` -> `.terminatePollLoop` * `KafkaConsumer.StateMachine`: move `logger` out of `State` * `KafkaConsumer`: move `pollInterval` out of `StateMachine`
Modifications: * create new class `RDKafkaTopicPartitionList` wrapping a `rd_kafka_topic_partition_list_t`
| ShutdownOnTerminate | ||
| > | ||
| /// Time between two consecutive polls. | ||
| private var pollInterval: Duration | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not part of the KafkaConsumerConfiguration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to do that in a separate PR as this should also be done for KafkaProducer, but yeah I can do it in this one already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate PR is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too late now 😅
| } | ||
| messageResult = .success(message) | ||
| } catch let kafkaError as KafkaError { | ||
| messageResult = .failure(kafkaError) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think this is pretty bad. If we get a bad message something seriously went wrong. At least that is what I expect. Can you investigate when this could happen and document it here. IMO it is weird that our sequence contains a Result<>, I would like to get rid of that.
| import Crdkafka | ||
|  | ||
| /// Swift wrapper type for `rd_kafka_topic_partition_list_t`. | ||
| class RDKafkaTopicPartitionList { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| class RDKafkaTopicPartitionList { | |
| final class RDKafkaTopicPartitionList { | 
Modifications: * make `RDKafkaTopicPartitionList` a `final class` * make `KafkaConsumer.messages` `AsyncSequence` return `KafkaConsumerMessage` instead of `Result<,>` type * `KafkaConsumer` fails immediately when `rd_kafka_message_t` with error payload is received * move `KafkaConsumer.pollInterval` to `KafkaConsumerConfiguration` * move `KafkaProducer.pollInterval` to `KafkaProducerConfiguration` * update `README`
Motivation
Our
KafkaConsumershould expose arun()method that serves the consumer queue and pollslibrdkafkafor new messages and any other queued callbacks. This aligns with the new implementation ofKafkaProducer.Additionally
weakreference toselfinKafkaConsumerDispatchQueueinKafkaConsumerModifications
KafkaConsumer.run()KafkaClient.consumerPoll(timeout:)serialQueueinKafkaConsumerwithStateMachinethatencapsulates all variables and can also be accessed from the
ConsumerMessagesAsyncSequenceKafkaConsumerwhenfor awaitloop ofAsyncSequenceisexited
KafkaConsumer.shutdownGracefullypublicBackPressureStrategyfromKafkaConsumerConfiguration