-
Notifications
You must be signed in to change notification settings - Fork 23
Closed
Labels
enhancementNew feature or requestNew feature or requestfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleasedBug or improvement that's done, it is in the development branch but yet unreleased
Description
Environment Information
- OS [e.g. Mac, Arch, Windows 10]: macOS 14.5 (23F79)
- Node Version [e.g. 8.2.1]: v18.17.0
- NPM Version [e.g. 5.4.2]: 9.6.7
- C++ Toolchain [e.g. Visual Studio, llvm, g++]: Xcode 14
- confluent-kafka-javascript version [e.g. 2.3.3]:
"@confluentinc/kafka-javascript": "0.1.16-devel"
Steps to Reproduce
const LibrdKafka = require("@confluentinc/kafka-javascript")
async function main() {
/** @type {import("@confluentinc/kafka-javascript").KafkaJS.Consumer} */
let consumer
const config = {
kafkaJS: {
clientId: `Repro-Client-${Date.now()}`,
brokers: ["127.0.0.1:9092"],
connectionTimeout: 7000,
requestTimeout: 30000,
retry: { initialRetryTime: 300, retries: 50, maxRetryTime: 600 },
logLevel: LibrdKafka.KafkaJS.logLevel.ERROR,
groupId: `Repro-Client-${Date.now()}-CG`,
rebalanceTimeout: 60000,
sessionTimeout: 45000,
allowAutoTopicCreation: true,
autoCommit: false,
partitionAssigners: ["cooperative-sticky"],
},
"group.instance.id":
`Repro-Client-${Date.now()}-Instance`,
rebalance_cb: (err, assignments) => {
const unflattened = []
for (const { topic, partition } of assignments) {
unflattened.push({ topic, partitions: [partition] })
}
consumer.pause(unflattened)
console.log("rebalance_cb: paused during callback:", consumer.paused())
},
}
const kafka = new LibrdKafka.KafkaJS.Kafka(config)
consumer = kafka.consumer(config)
await consumer.connect()
// Fill in some topics you have on hand
await consumer.subscribe({
topics: [/^.*something.*/],
})
await consumer.run({
eachMessage: async message => {
console.log("Expected to never be called:", message)
},
})
await new Promise(resolve => setTimeout(resolve, 50_000))
}
main()Here's the console output I get:
rebalance_cb: paused during callback: [
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0001',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0002',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0003',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0004',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0005',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0006',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0007',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0008',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0009',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0010',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0011',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0012',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0013',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0014',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0015',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0016',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0017',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0019',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0020',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0021',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0022',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0023',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0024',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0025',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0026',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0027',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0028',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0029',
partitions: [ 0 ]
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0030',
partitions: [ 0 ]
}
]
INTERNALS: incrementalAssign [
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0001',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0002',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0003',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0004',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0005',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0006',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0007',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0008',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0009',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0010',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0011',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0012',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0013',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0014',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0015',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0016',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0017',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0019',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0020',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0021',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0022',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0023',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0024',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0025',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0026',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0027',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0028',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0029',
partition: 0
},
{
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0030',
partition: 0
}
]
Expected to never be called: {
topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
partition: 0,
message: {
key: <Buffer 7b 22 69 64 22 3a 22 32 32 36 37 31 3a 31 35 34 36 37 30 34 38 38 22 2c 22 5f 5f 64 62 7a 5f 5f 70 68 79 73 69 63 61 6c 54 61 62 6c 65 49 64 65 6e 74 ... 96 more bytes>,
value: <Buffer 7b 22 73 74 61 74 75 73 22 3a 22 42 45 47 49 4e 22 2c 22 69 64 22 3a 22 32 32 36 37 31 3a 31 35 34 36 37 30 34 38 38 22 2c 22 65 76 65 6e 74 5f 63 6f ... 56 more bytes>,
timestamp: '1723065962194',
attributes: 0,
offset: '1543',
size: 106,
leaderEpoch: 21,
headers: undefined
},
heartbeat: [AsyncFunction: heartbeat],
pause: [Function: bound pause]
}
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requestfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleasedBug or improvement that's done, it is in the development branch but yet unreleased