Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# confluent-kafka-javascript v0.3.1

v0.3.1 is a limited availability maintenance release. It is supported for all usage.

## Enhancements

1. Fixes an issue where headers were not passed correctly to the `eachBatch` callback (#130).


# confluent-kafka-javascript v0.3.0

v0.3.0 is a limited availability feature release. It is supported for all usage.
Expand Down
50 changes: 23 additions & 27 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -645,22 +645,15 @@ class Consumer {
}

/**
* Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback.
* @param {import("../..").Message} message
* @returns {import("../../types/kafkajs").EachMessagePayload}
* Converts headers returned by node-rdkafka into a format that can be used by the eachMessage/eachBatch callback.
* @param {import("../..").MessageHeader[] | undefined} messageHeaders
* @returns {import("../../types/kafkajs").IHeaders}
*/
#createPayload(message) {
let key = message.key;
if (typeof key === 'string') {
key = Buffer.from(key);
}

let timestamp = message.timestamp ? String(message.timestamp) : '';

#createHeaders(messageHeaders) {
let headers;
if (message.headers) {
if (messageHeaders) {
headers = {};
for (const header of message.headers) {
for (const header of messageHeaders) {
for (const [key, value] of Object.entries(header)) {
if (!Object.hasOwn(headers, key)) {
headers[key] = value;
Expand All @@ -672,6 +665,22 @@ class Consumer {
}
}
}
return headers;
}

/**
* Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback.
* @param {import("../..").Message} message
* @returns {import("../../types/kafkajs").EachMessagePayload}
*/
#createPayload(message) {
let key = message.key;
if (typeof key === 'string') {
key = Buffer.from(key);
}

let timestamp = message.timestamp ? String(message.timestamp) : '';
const headers = this.#createHeaders(message.headers);

return {
topic: message.topic,
Expand Down Expand Up @@ -788,20 +797,7 @@ class Consumer {
}

let timestamp = message.timestamp ? String(message.timestamp) : '';

let headers;
if (message.headers) {
headers = {};
for (const [key, value] of Object.entries(message.headers)) {
if (!Object.hasOwn(headers, key)) {
headers[key] = value;
} else if (headers[key].constructor === Array) {
headers[key].push(value);
} else {
headers[key] = [headers[key], value];
}
}
}
const headers = this.#createHeaders(message.headers);

const messageConverted = {
key,
Expand Down
48 changes: 48 additions & 0 deletions test/promisified/consumer/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,54 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
);
});

it('consume batch of messages with headers', async () => {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: topicName });

const messagesConsumed = [];
consumer.run({
partitionsConsumedConcurrently,
eachBatch: async event => messagesConsumed.push(event)
});

const messages = [{
value: `value-${secureRandom}`,
headers: {
'header-1': 'value-1',
'header-2': 'value-2',
'header-3': ['value-3-1', 'value-3-2', Buffer.from([1, 0, 1, 0, 1])],
'header-4': Buffer.from([1, 0, 1, 0, 1]),
},
partition: 0,
}];

await producer.send({ topic: topicName, messages });
await waitForMessages(messagesConsumed, { number: messages.length });

expect(messagesConsumed[0]).toEqual(
expect.objectContaining({
batch: expect.objectContaining({
topic: topicName,
partition: 0,
messages: [
expect.objectContaining({
value: Buffer.from(messages[0].value),
offset: '0',
headers: {
// Headers are always returned as Buffers from the broker.
'header-1': Buffer.from('value-1'),
'header-2': Buffer.from('value-2'),
'header-3': [Buffer.from('value-3-1'), Buffer.from('value-3-2'), Buffer.from([1, 0, 1, 0, 1])],
'header-4': Buffer.from([1, 0, 1, 0, 1]),
}
}),
]
}),
})
);
});

it.each([[true], [false]])('consumes messages using eachBatch - isAutoResolve: %s', async (isAutoResolve) => {
await consumer.connect();
await producer.connect();
Expand Down