-
Notifications
You must be signed in to change notification settings - Fork 7
Make messages stream breakable automatically #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add it to the docs?
throw new UserError('Cannot collect messages when the stream is in MANUAL break mode.') | ||
} | ||
return Array.fromAsync(this) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I would add this. As you noted, a user could just do Array.fromAsync()
, which would do the trick. It would be good to document it in the docs.
The place where we should add this would be in the consumer
as collectNBatch()
, which would be easier.
@@ -402,6 +413,10 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable | |||
} | |||
|
|||
this.#pushRecords(metadata, topicIds, response, requestedOffsets) | |||
|
|||
if (this.#breakMode === MessagesStreamBreakModes.AFTER_FIRST_BATCH && this.#inflightNodes.size === 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be generalized to collecting the first n batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @mcollina to make the method generic for n-batches.
This would imply to accept a numeric option (0 by default) rather than an enum.
Anyway, this PR is fantastic, thanks for your contribution!
@@ -402,6 +413,10 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable | |||
} | |||
|
|||
this.#pushRecords(metadata, topicIds, response, requestedOffsets) | |||
|
|||
if (this.#breakMode === MessagesStreamBreakModes.AFTER_FIRST_BATCH && this.#inflightNodes.size === 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
This change introduces an option
breakMode
to theMessagesStream
class and likewise to the options of the consumer'sconsume
method. This option accepts one of two allowed values:data
event handler.MessageStream
class issue only one fetch request per leader of a topic's partition without requeueing more requests. After all issued requests are completed, the stream ends and returns all the collected messages.Additionally, the
MessagesStream
class gets a new methodcollect
which returns an array of all the messages that are collectable via 2. This method is only supported if thebreakMode
option is set to 'after-first-batch'.Related: