Skip to content

KafkaJS.Consumer: Pausing topics during rebalance_cb not supported #62

@justjake

Description

@justjake

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: macOS 14.5 (23F79)
  • Node Version [e.g. 8.2.1]: v18.17.0
  • NPM Version [e.g. 5.4.2]: 9.6.7
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: Xcode 14
  • confluent-kafka-javascript version [e.g. 2.3.3]: "@confluentinc/kafka-javascript": "0.1.16-devel"

Steps to Reproduce

const LibrdKafka = require("@confluentinc/kafka-javascript")

async function main() {
	/** @type {import("@confluentinc/kafka-javascript").KafkaJS.Consumer} */
	let consumer

	const config = {
		kafkaJS: {
			clientId: `Repro-Client-${Date.now()}`,
			brokers: ["127.0.0.1:9092"],
			connectionTimeout: 7000,
			requestTimeout: 30000,
			retry: { initialRetryTime: 300, retries: 50, maxRetryTime: 600 },
			logLevel: LibrdKafka.KafkaJS.logLevel.ERROR,
			groupId: `Repro-Client-${Date.now()}-CG`,
			rebalanceTimeout: 60000,
			sessionTimeout: 45000,
			allowAutoTopicCreation: true,
			autoCommit: false,
			partitionAssigners: ["cooperative-sticky"],
		},
		"group.instance.id":
			`Repro-Client-${Date.now()}-Instance`,
		rebalance_cb: (err, assignments) => {
			const unflattened = []
			for (const { topic, partition } of assignments) {
				unflattened.push({ topic, partitions: [partition] })
			}
			consumer.pause(unflattened)
			console.log("rebalance_cb: paused during callback:", consumer.paused())
		},
	}

	const kafka = new LibrdKafka.KafkaJS.Kafka(config)
	consumer = kafka.consumer(config)

	await consumer.connect()
	// Fill in some topics you have on hand
	await consumer.subscribe({
		topics: [/^.*something.*/],
	})

	await consumer.run({
		eachMessage: async message => {
			console.log("Expected to never be called:", message)
		},
	})

	await new Promise(resolve => setTimeout(resolve, 50_000))
}

main()

Here's the console output I get:

rebalance_cb: paused during callback: [
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0001',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0002',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0003',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0004',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0005',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0006',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0007',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0008',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0009',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0010',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0011',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0012',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0013',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0014',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0015',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0016',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0017',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0019',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0020',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0021',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0022',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0023',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0024',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0025',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0026',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0027',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0028',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0029',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0030',
    partitions: [ 0 ]
  }
]


INTERNALS: incrementalAssign [
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0001',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0002',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0003',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0004',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0005',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0006',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0007',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0008',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0009',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0010',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0011',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0012',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0013',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0014',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0015',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0016',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0017',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0019',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0020',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0021',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0022',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0023',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0024',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0025',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0026',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0027',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0028',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0029',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0030',
    partition: 0
  }
]
Expected to never be called: {
  topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
  partition: 0,
  message: {
    key: <Buffer 7b 22 69 64 22 3a 22 32 32 36 37 31 3a 31 35 34 36 37 30 34 38 38 22 2c 22 5f 5f 64 62 7a 5f 5f 70 68 79 73 69 63 61 6c 54 61 62 6c 65 49 64 65 6e 74 ... 96 more bytes>,
    value: <Buffer 7b 22 73 74 61 74 75 73 22 3a 22 42 45 47 49 4e 22 2c 22 69 64 22 3a 22 32 32 36 37 31 3a 31 35 34 36 37 30 34 38 38 22 2c 22 65 76 65 6e 74 5f 63 6f ... 56 more bytes>,
    timestamp: '1723065962194',
    attributes: 0,
    offset: '1543',
    size: 106,
    leaderEpoch: 21,
    headers: undefined
  },
  heartbeat: [AsyncFunction: heartbeat],
  pause: [Function: bound pause]
}

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleased

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions