-
Couldn't load subscription status.
- Fork 157
Description
Supersedes:
Background
There are sometimes scenarios where it would be helpful if we can temporarily pause processing without unsubscribing from the topic. In our specific scenario we have a health monitor that needs to be able to pause processing if external resources become unavailable and resume processing once they become healthy again.
Note: One could call pause/resume on the KafkaConsumer but there is no easy way to do this in combination with PC because once PC is running any external operations on the KafkaConsumer becomes unsafe. Calling pause/resume on the KafkaConsumer will race against other calls which the POC does on the KafkaConsumer and calls may fail anytime.
Proposal
I'd be happy to create a PR but I'd like to verify my idea first with you:
- Add two methods
pausePolling/resumePollingto theAbstractParallelEoSStreamProcessorwhich delegate to two new methodspausePolling/resumePollingon theBrokerPollSystem. pausePolling/resumePollingjust sets an internal boolean on theBrokerPollSystem. There is no need to physically pause/resume the subscribed partitions on the underlyingKafkaConsumer.- The
controlLoopevaluates whether or not polling is paused. If paused, it skips polling for new records and instead sleeps (e.g. for 100 ms). However even in paused modemaybeDoCommitwill still be called as there may be work in flight when pause is called which would still be processed.- state changes to
draining/closingare still processed to allow clean transitions when the system is shutdown.
- I'd explicitly not integrate this with the
BrokerPollSystem's internaldoPause/resumeIfPausedfeature which has a different purpose.
Let me know what you think :).