diff --git a/CHANGELOG.md b/CHANGELOG.md index 721e99e8..b651bdc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# confluent-kafka-javascript v0.3.1 + +v0.3.1 is a limited availability maintenance release. It is supported for all usage. + +## Enhancements + +1. Fixes an issue where headers were not passed correctly to the `eachBatch` callback (#130). + + # confluent-kafka-javascript v0.3.0 v0.3.0 is a limited availability feature release. It is supported for all usage. diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 1f6ac01e..c6a1c503 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -645,22 +645,15 @@ class Consumer { } /** - * Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback. - * @param {import("../..").Message} message - * @returns {import("../../types/kafkajs").EachMessagePayload} + * Converts headers returned by node-rdkafka into a format that can be used by the eachMessage/eachBatch callback. + * @param {import("../..").MessageHeader[] | undefined} messageHeaders + * @returns {import("../../types/kafkajs").IHeaders} */ - #createPayload(message) { - let key = message.key; - if (typeof key === 'string') { - key = Buffer.from(key); - } - - let timestamp = message.timestamp ? String(message.timestamp) : ''; - + #createHeaders(messageHeaders) { let headers; - if (message.headers) { + if (messageHeaders) { headers = {}; - for (const header of message.headers) { + for (const header of messageHeaders) { for (const [key, value] of Object.entries(header)) { if (!Object.hasOwn(headers, key)) { headers[key] = value; @@ -672,6 +665,22 @@ class Consumer { } } } + return headers; + } + + /** + * Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback. + * @param {import("../..").Message} message + * @returns {import("../../types/kafkajs").EachMessagePayload} + */ + #createPayload(message) { + let key = message.key; + if (typeof key === 'string') { + key = Buffer.from(key); + } + + let timestamp = message.timestamp ? String(message.timestamp) : ''; + const headers = this.#createHeaders(message.headers); return { topic: message.topic, @@ -788,20 +797,7 @@ class Consumer { } let timestamp = message.timestamp ? String(message.timestamp) : ''; - - let headers; - if (message.headers) { - headers = {}; - for (const [key, value] of Object.entries(message.headers)) { - if (!Object.hasOwn(headers, key)) { - headers[key] = value; - } else if (headers[key].constructor === Array) { - headers[key].push(value); - } else { - headers[key] = [headers[key], value]; - } - } - } + const headers = this.#createHeaders(message.headers); const messageConverted = { key, diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index b4556999..de66c35d 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -134,6 +134,54 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit ); }); + it('consume batch of messages with headers', async () => { + await consumer.connect(); + await producer.connect(); + await consumer.subscribe({ topic: topicName }); + + const messagesConsumed = []; + consumer.run({ + partitionsConsumedConcurrently, + eachBatch: async event => messagesConsumed.push(event) + }); + + const messages = [{ + value: `value-${secureRandom}`, + headers: { + 'header-1': 'value-1', + 'header-2': 'value-2', + 'header-3': ['value-3-1', 'value-3-2', Buffer.from([1, 0, 1, 0, 1])], + 'header-4': Buffer.from([1, 0, 1, 0, 1]), + }, + partition: 0, + }]; + + await producer.send({ topic: topicName, messages }); + await waitForMessages(messagesConsumed, { number: messages.length }); + + expect(messagesConsumed[0]).toEqual( + expect.objectContaining({ + batch: expect.objectContaining({ + topic: topicName, + partition: 0, + messages: [ + expect.objectContaining({ + value: Buffer.from(messages[0].value), + offset: '0', + headers: { + // Headers are always returned as Buffers from the broker. + 'header-1': Buffer.from('value-1'), + 'header-2': Buffer.from('value-2'), + 'header-3': [Buffer.from('value-3-1'), Buffer.from('value-3-2'), Buffer.from([1, 0, 1, 0, 1])], + 'header-4': Buffer.from([1, 0, 1, 0, 1]), + } + }), + ] + }), + }) + ); + }); + it.each([[true], [false]])('consumes messages using eachBatch - isAutoResolve: %s', async (isAutoResolve) => { await consumer.connect(); await producer.connect();