Skip to content

Make messages stream breakable automatically #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

lp247
Copy link

@lp247 lp247 commented Jul 7, 2025

This change introduces an option breakMode to the MessagesStream class and likewise to the options of the consumer's consume method. This option accepts one of two allowed values:

  1. 'manual' -> This mode lets the 'MessageStream' work like before. The stream does not terminate by itself, and termination has to be done manually, either within the async while loop or within the data event handler.
  2. 'after-first-batch' -> This mode lets the MessageStream class issue only one fetch request per leader of a topic's partition without requeueing more requests. After all issued requests are completed, the stream ends and returns all the collected messages.

Additionally, the MessagesStream class gets a new method collect which returns an array of all the messages that are collectable via 2. This method is only supported if the breakMode option is set to 'after-first-batch'.

Related:

@mcollina mcollina requested a review from ShogunPanda July 7, 2025 15:21
Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add it to the docs?

throw new UserError('Cannot collect messages when the stream is in MANUAL break mode.')
}
return Array.fromAsync(this)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I would add this. As you noted, a user could just do Array.fromAsync(), which would do the trick. It would be good to document it in the docs.

The place where we should add this would be in the consumer as collectNBatch(), which would be easier.

@@ -402,6 +413,10 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
}

this.#pushRecords(metadata, topicIds, response, requestedOffsets)

if (this.#breakMode === MessagesStreamBreakModes.AFTER_FIRST_BATCH && this.#inflightNodes.size === 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be generalized to collecting the first n batches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

@lp247 lp247 marked this pull request as draft July 7, 2025 15:37
Copy link
Contributor

@ShogunPanda ShogunPanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @mcollina to make the method generic for n-batches.
This would imply to accept a numeric option (0 by default) rather than an enum.

Anyway, this PR is fantastic, thanks for your contribution!

@@ -402,6 +413,10 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
}

this.#pushRecords(metadata, topicIds, response, requestedOffsets)

if (this.#breakMode === MessagesStreamBreakModes.AFTER_FIRST_BATCH && this.#inflightNodes.size === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants