Skip to content

Commit 2f84999

Browse files
authored
Configurable batch size (#393)
* Configurable batch size * Keep 1.5s of data in cache * Remove minimum 1024 cache size for faster rebalances in case of long processing time * Changed property name to `js.consumer.max.batch.size` * Use only 1.5 seconds cache size estimation aligned at batch size * concurrency * Fix for at-least-once guarantee not ensured in case a seek happens on one partition and there are messages being fetched for other partitions * Configurable cache size in milliseconds * Add worker identifier to the payload for better debugging * Fix for test flakyness * Make `is cleared before rebalance` less flaky in case of increased time before first assignment * Reduce flakyness of 'times out if messages are pending' * FIXME about KIP-848 autocommit issue * v1.7.0
1 parent f3fb2ec commit 2f84999

File tree

14 files changed

+253
-179
lines changed

14 files changed

+253
-179
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
# confluent-kafka-javascript 1.7.0
2+
3+
v1.7.0 is a feature release. It is supported for all usage.
4+
5+
### Enhancements
6+
7+
1. Configurable batch size through the `js.consumer.max.batch.size` property
8+
and cache size through the `js.consumer.max.cache.size.per.worker.ms`
9+
property (#393).
10+
2. Fix for at-least-once guarantee not ensured in case a seek happens on one
11+
partition and there are messages being fetched about other partitions (#393).
12+
13+
114
# confluent-kafka-javascript 1.6.0
215

316
v1.6.0 is a feature release. It is supported for all usage.

MIGRATION.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,9 +303,14 @@ producerRun().then(consumerRun).catch(console.error);
303303
- The `heartbeat()` no longer needs to be called by the user in the `eachMessage/eachBatch` callback.
304304
Heartbeats are automatically managed by librdkafka.
305305
- The `partitionsConsumedConcurrently` is supported by both `eachMessage` and `eachBatch`.
306-
- An API compatible version of `eachBatch` is available, but the batch size calculation is not
307-
as per configured parameters, rather, a constant maximum size is configured internally. This is subject
308-
to change.
306+
- An API compatible version of `eachBatch` is available, maximum batch size
307+
can be configured through the `js.consumer.max.batch.size` configuration property
308+
and defaults to 32. It is not dependent on the size of the produced batches
309+
present on the broker, as these are constructed client-side. Similar to
310+
the Java client configuration `max.poll.records`.
311+
`js.consumer.max.cache.size.per.worker.ms` allows to
312+
configure the cache size estimated based on consumption rate and defaults
313+
to the cache being sized to 1.5s worth of messages.
309314
The property `eachBatchAutoResolve` is supported.
310315
Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported,
311316
and within the returned batch, `offsetLag` and `offsetLagLow` are supported.

ci/update-version.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ function getPackageVersion(tag, branch) {
8989

9090
// publish with a -devel suffix for EA and RC releases.
9191
if (tag.prerelease.length > 0) {
92-
baseVersion += '-' + tag.prerelease.join('-');
92+
baseVersion += '-' + tag.prerelease.join('.');
9393
}
9494

9595
console.log(`Package version is "${baseVersion}"`);

0 commit comments

Comments
 (0)