-
Notifications
You must be signed in to change notification settings - Fork 9
wip: single active consumer offset tracking #247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wip: single active consumer offset tracking #247
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for single active consumer offset tracking by incorporating a consumer update listener in the consumer, client, and connection modules, and by updating end-to-end tests to cover the new behavior.
- Introduces a consumerUpdateListener and singleActive flag in StreamConsumer.
- Updates the client logic to handle consumer update queries and offset resumption.
- Adds an end-to-end test that verifies the offset tracking mechanism when switching the active consumer.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
test/e2e/declare_consumer.test.ts | Added a new test case for validating offset tracking with two single active consumers. |
src/consumer.ts | Integrated consumerUpdateListener and singleActive flag into consumer initialization. |
src/connection.ts | Introduced a debug console.log in storeOffset (likely for debugging purposes). |
src/client.ts | Updated consumer declaration and added getConsumerOrServerSavedOffset logic with debug logs. |
Comments suppressed due to low confidence (1)
test/e2e/declare_consumer.test.ts:220
- Remove or replace 'it.only' to ensure that the full test suite runs during CI, unless this is an intentional temporary measure.
it.only(
0891281
to
5728d8b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a consumer update listener for single active consumers so that upon activation the consumer can resume message consumption from a previously saved offset. Key changes include:
- Adding a new consumer update listener callback in both the test and production code.
- Updating the consumer and client code to support offset updates and single active consumer behavior.
- Providing an example demonstrating how to resume consumption from a designated offset.
Reviewed Changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
test/e2e/declare_consumer.test.ts | Added a new end-to-end test case for single active consumer offset tracking. |
src/consumer.ts | Introduced ConsumerUpdateListener type and implementation for updating offset. |
src/client.ts | Added logic to call consumerUpdateListener and update consumer offset accordingly. |
example/src/single_active_consumer_update_example.js | Provided a usage example for the new consumer update behavior. |
Files not reviewed (1)
- example/package-lock.json: Language not supported
Comments suppressed due to low confidence (1)
test/e2e/declare_consumer.test.ts:246
- [nitpick] Using fixed wait times in tests may lead to flaky test behavior; consider replacing fixed delays with more deterministic synchronization mechanisms (such as polling or event-based triggers) to ensure consumer state readiness.
await wait(500)
This pr adds a consumer update listener in the consumer. In this way, the end user can customize the behaviour of a single active consumer when it becomes active. An example is also added to show how a single active consumer can resume the consuming of messages from a specific offset, upon activation.