diff --git a/examples/kafkajs/consumer.js b/examples/kafkajs/consumer.js index 22d5ccc7..cd8df5c7 100644 --- a/examples/kafkajs/consumer.js +++ b/examples/kafkajs/consumer.js @@ -1,4 +1,4 @@ -const { Kafka } = require('../..').KafkaJS +const { Kafka } = require('../..').KafkaJS; //const { Kafka } = require('kafkajs') async function consumerStart() { @@ -32,7 +32,12 @@ async function consumerStart() { } }, rdKafka: { + globalConfig: { 'enable.auto.commit': false + }, + topicConfig: { + 'auto.offset.reset': 'earliest' + }, } }); diff --git a/examples/kafkajs/eos.js b/examples/kafkajs/eos.js index 6c9f85ca..be06485e 100644 --- a/examples/kafkajs/eos.js +++ b/examples/kafkajs/eos.js @@ -15,7 +15,9 @@ async function eosStart() { const consumer = kafka.consumer({ groupId: 'groupId', rdKafka: { - "enable.auto.commit": false, + globalConfig: { + "enable.auto.commit": false, + } }, }); @@ -34,7 +36,8 @@ async function eosStart() { // The run method acts like a consume-transform-produce loop. consumer.run({ eachMessage: async ({ topic, partition, message }) => { - const msgAckString = JSON.stringify({topic, + const msgAckString = JSON.stringify({ + topic, partition, offset: message.offset, key: message.key?.toString(), diff --git a/lib/kafkajs/_common.js b/lib/kafkajs/_common.js index 4ffc3f4f..6d1c7571 100644 --- a/lib/kafkajs/_common.js +++ b/lib/kafkajs/_common.js @@ -1,38 +1,47 @@ +/** + * @function kafkaJSToRdKafkaConfig() + * @param {object} config + * @returns {{globalConfig: import("../../types/config").ConsumerGlobalConfig|import("../../types/config").ProducerTopicConfig, topicConfig: import("../../types/config").ConsumerTopicConfig|import("../../types/config").ProducerTopicConfig}} + */ async function kafkaJSToRdKafkaConfig(config) { - const ret = { - 'allow.auto.create.topics': 'false' - } - ret['bootstrap.servers'] = config['brokers'].join(','); + const globalConfig = { + "allow.auto.create.topics": "false", + }; + const topicConfig = {}; + globalConfig["bootstrap.servers"] = config["brokers"].join(","); let withSASL = false; if (config.sasl) { - const sasl = config.sasl; - if (sasl.mechanism === 'plain' && - typeof sasl.username === 'string' && - typeof sasl.password === 'string') { - ret['sasl.mechanism'] = 'PLAIN'; - ret['sasl.username'] = sasl.username; - ret['sasl.password'] = sasl.password; - withSASL = true; + const sasl = config.sasl; + if ( + sasl.mechanism === "plain" && + typeof sasl.username === "string" && + typeof sasl.password === "string" + ) { + globalConfig["sasl.mechanism"] = "PLAIN"; + globalConfig["sasl.username"] = sasl.username; + globalConfig["sasl.password"] = sasl.password; + withSASL = true; } } if (config.ssl === true && withSASL) { - ret['security.protocol'] = 'sasl_ssl'; + globalConfig["security.protocol"] = "sasl_ssl"; } else if (withSASL) { - ret['security.protocol'] = 'sasl_plaintext'; + globalConfig["security.protocol"] = "sasl_plaintext"; } if (config.rdKafka) { if (config.rdKafka.constructor === Function) { - await config.rdKafka(ret); + await config.rdKafka(globalConfig, topicConfig); } else { - Object.assign(ret, config.rdKafka); + Object.assign(globalConfig, config.rdKafka.globalConfig); + Object.assign(topicConfig, config.rdKafka.topicConfig); } } - return ret; + return { globalConfig, topicConfig }; } function topicPartitionOffsetToRdKafka(tpo) { @@ -40,7 +49,7 @@ function topicPartitionOffsetToRdKafka(tpo) { topic: tpo.topic, partition: tpo.partition, offset: Number(tpo.offset), - } + }; } -module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } +module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka }; diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 9cfdba0c..776d9d4b 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -3,8 +3,8 @@ const RdKafka = require('../rdkafka'); const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } = require('./_common'); const ConsumerState = Object.freeze({ - INIT: 0, - CONNECTING: 1, + INIT: 0, + CONNECTING: 1, CONNECTED: 2, DISCONNECTING: 3, DISCONNECTED: 4, @@ -17,38 +17,42 @@ class Consumer { #connectPromiseFunc = {}; #state = ConsumerState.INIT; + /** + * @constructor + * @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig + */ constructor(kJSConfig) { this.#kJSConfig = kJSConfig; } - #config() { + async #config() { if (!this.#rdKafkaConfig) - this.#rdKafkaConfig = this.#finalizedConfig(); + this.#rdKafkaConfig = await this.#finalizedConfig(); return this.#rdKafkaConfig; } async #finalizedConfig() { - const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig); + const { globalConfig, topicConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig); if (this.#kJSConfig.groupId) { - config['group.id'] = this.#kJSConfig.groupId; + globalConfig['group.id'] = this.#kJSConfig.groupId; } - config['offset_commit_cb'] = true; + globalConfig['offset_commit_cb'] = true; if (this.#kJSConfig.rebalanceListener) { - config['rebalance_cb'] = (err, assignment) => { + globalConfig['rebalance_cb'] = (err, assignment) => { // Create the librdkafka error err = LibrdKafkaError.create(err); let call; - switch(err.code) { + switch (err.code) { case LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS: call = (this.#kJSConfig.rebalanceListener.onPartitionsAssigned ? - this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) : - Promise.resolve()).catch(console.error); + this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) : + Promise.resolve()).catch(console.error); break; case LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS: call = (this.#kJSConfig.rebalanceListener.onPartitionsRevoked ? - this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) : - Promise.resolve()).catch(console.error); + this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) : + Promise.resolve()).catch(console.error); break; default: call = Promise.reject().catch(() => { @@ -58,46 +62,46 @@ class Consumer { } call - .finally(() => { - // Emit the event - this.#internalClient.emit('rebalance', err, assignment); - - try { - if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) { - this.#internalClient.assign(assignment); - } else { - this.#internalClient.unassign(); + .finally(() => { + // Emit the event + this.#internalClient.emit('rebalance', err, assignment); + + try { + if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) { + this.#internalClient.assign(assignment); + } else { + this.#internalClient.unassign(); + } + } catch (e) { + // Ignore exceptions if we are not connected + if (this.#internalClient.isConnected()) { + this.#internalClient.emit('rebalance.error', e); + } } - } catch (e) { - // Ignore exceptions if we are not connected - if (this.#internalClient.isConnected()) { - this.#internalClient.emit('rebalance.error', e); - } - } - }); + }); }; } - return config; + return { globalConfig, topicConfig }; } #readyCb(arg) { - if (this.#state !== ConsumerState.CONNECTING) { - // I really don't know how to handle this now. - return; - } - this.#state = ConsumerState.CONNECTED; + if (this.#state !== ConsumerState.CONNECTING) { + // I really don't know how to handle this now. + return; + } + this.#state = ConsumerState.CONNECTED; - // Resolve the promise. - this.#connectPromiseFunc['resolve'](); + // Resolve the promise. + this.#connectPromiseFunc['resolve'](); } #errorCb(args) { - console.log('error', args); - if (this.#state === ConsumerState.CONNECTING) { - this.#connectPromiseFunc['reject'](args); - } else { - // do nothing for now. - } + console.log('error', args); + if (this.#state === ConsumerState.CONNECTING) { + this.#connectPromiseFunc['reject'](args); + } else { + // do nothing for now. + } } #notImplemented() { @@ -111,7 +115,7 @@ class Consumer { } let timestamp = message.timestamp ? new Date(message.timestamp).toISOString() - : ''; + : ''; var headers = undefined; if (message.headers) { @@ -139,14 +143,14 @@ class Consumer { size: message.size, headers }, - heartbeat: async () => {}, - pause: () => {} + heartbeat: async () => { }, + pause: () => { } } } async #consumeSingle() { return new Promise((resolve, reject) => { - this.#internalClient.consume(1, function(err, messages) { + this.#internalClient.consume(1, function (err, messages) { if (err) { reject(`Consume error code ${err.code}`); return; @@ -168,7 +172,7 @@ class Consumer { }); else { for (let partition of topic.partitions) { - ret.push({topic: topic.topic, partition}); + ret.push({ topic: topic.topic, partition }); } } } @@ -180,22 +184,23 @@ class Consumer { } async connect() { - if (this.#state !== ConsumerState.INIT) { - return Promise.reject('Connect has already been called elsewhere.'); - } + if (this.#state !== ConsumerState.INIT) { + return Promise.reject('Connect has already been called elsewhere.'); + } - this.#state = ConsumerState.CONNECTING; - this.#internalClient = new RdKafka.KafkaConsumer(await this.#config()); - this.#internalClient.on('ready', this.#readyCb.bind(this)); - this.#internalClient.on('event.error', this.#errorCb.bind(this)); - this.#internalClient.on('event.log', console.log); - - return new Promise((resolve, reject) => { - this.#connectPromiseFunc = {resolve, reject}; - console.log('Connecting....'); - this.#internalClient.connect(); - console.log('connect() called'); - }); + this.#state = ConsumerState.CONNECTING; + const { globalConfig, topicConfig } = await this.#config(); + this.#internalClient = new RdKafka.KafkaConsumer(globalConfig, topicConfig); + this.#internalClient.on('ready', this.#readyCb.bind(this)); + this.#internalClient.on('event.error', this.#errorCb.bind(this)); + this.#internalClient.on('event.log', console.log); + + return new Promise((resolve, reject) => { + this.#connectPromiseFunc = { resolve, reject }; + console.log('Connecting....'); + this.#internalClient.connect(); + console.log('connect() called'); + }); } async subscribe(subscription) { @@ -208,7 +213,7 @@ class Consumer { async run(config) { if (this.#state !== ConsumerState.CONNECTED) { - throw new Error('Run must be called in state CONNECTED.'); + throw new Error('Run must be called in state CONNECTED.'); } while (this.#state === ConsumerState.CONNECTED) { @@ -240,7 +245,7 @@ class Consumer { seek(topicPartitionOffset) { return new Promise((resolve, reject) => { const rdKafkaTopicPartitionOffset = - topicPartitionOffsetToRdKafka(topicPartitionOffset); + topicPartitionOffsetToRdKafka(topicPartitionOffset); this.#internalClient.seek(rdKafkaTopicPartitionOffset, 0, (err) => { if (err) { reject(new Error(`Seek error code ${err.code}`)); diff --git a/lib/kafkajs/_kafka.js b/lib/kafkajs/_kafka.js index e8849138..86f937d4 100644 --- a/lib/kafkajs/_kafka.js +++ b/lib/kafkajs/_kafka.js @@ -1,36 +1,35 @@ -const { Producer } = require('./_producer'); -const { Consumer } = require('./_consumer'); - +const { Producer } = require("./_producer"); +const { Consumer } = require("./_consumer"); class Kafka { - #commonClientConfig = {}; - - constructor(config) { - this.#commonClientConfig = config; + #commonClientConfig = {}; + + constructor(config) { + this.#commonClientConfig = config; + } + + #mergeConfiguration(config) { + let baseConfig = Object.assign({}, this.#commonClientConfig); + config = Object.assign({}, config); + + let rdKafka = baseConfig.rdKafka; + Object.assign(baseConfig, config); + if (rdKafka && config.rdKafka) { + baseConfig.rdKafka = { + ...rdKafka, + ...config.rdKafka, + }; } + return baseConfig; + } - #mergeConfiguration(config) { - let baseConfig = Object.assign({}, this.#commonClientConfig); - config = Object.assign({}, config); - - let rdKafka = baseConfig.rdKafka; - Object.assign(baseConfig, config); - if (rdKafka && config.rdKafka) { - baseConfig.rdKafka = { - ...rdKafka, - ...config.rdKafka - } - } - return baseConfig; - } + producer(config) { + return new Producer(this.#mergeConfiguration(config)); + } - producer(config) { - return new Producer(this.#mergeConfiguration(config)); - } - - consumer(config) { - return new Consumer(this.#mergeConfiguration(config)); - } + consumer(config) { + return new Consumer(this.#mergeConfiguration(config)); + } } -module.exports = { Kafka } +module.exports = { Kafka }; diff --git a/lib/kafkajs/_producer.js b/lib/kafkajs/_producer.js index acd31932..52616bac 100644 --- a/lib/kafkajs/_producer.js +++ b/lib/kafkajs/_producer.js @@ -3,8 +3,8 @@ const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } = require('./_co const { Consumer } = require('./_consumer'); const ProducerState = Object.freeze({ - INIT: 0, - CONNECTING: 1, + INIT: 0, + CONNECTING: 1, INITIALIZING_TRANSACTIONS: 2, INITIALIZED_TRANSACTIONS: 3, CONNECTED: 4, @@ -20,25 +20,29 @@ class Producer { #state = ProducerState.INIT; #ongoingTransaction = false; + /** + * @constructor + * @param {import("../../types/kafkajs").ProducerConfig} kJSConfig + */ constructor(kJSConfig) { this.#kJSConfig = kJSConfig; } - #config() { + async #config() { if (!this.#rdKafkaConfig) - this.#rdKafkaConfig = this.#finalizedConfig(); + this.#rdKafkaConfig = await this.#finalizedConfig(); return this.#rdKafkaConfig; } async #finalizedConfig() { - const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig); - config.dr_cb = 'true'; + const { globalConfig, topicConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig); + globalConfig.dr_cb = 'true'; if (this.#kJSConfig.hasOwnProperty('transactionalId')) { - config['transactional.id'] = this.#kJSConfig.transactionalId; + globalConfig['transactional.id'] = this.#kJSConfig.transactionalId; } - return config; + return { globalConfig, topicConfig }; } #flattenTopicPartitionOffsets(topics) { @@ -50,101 +54,102 @@ class Producer { } #readyTransactions(err) { - if (err) { - this.#connectPromiseFunc["reject"](err); - return; - } + if (err) { + this.#connectPromiseFunc["reject"](err); + return; + } - if (this.#state !== ProducerState.INITIALIZING_TRANSACTIONS) { - // FSM impossible state. We should add error handling for - // this later. - return; - } + if (this.#state !== ProducerState.INITIALIZING_TRANSACTIONS) { + // FSM impossible state. We should add error handling for + // this later. + return; + } - this.#state = ProducerState.INITIALIZED_TRANSACTIONS; - this.#readyCb(null); + this.#state = ProducerState.INITIALIZED_TRANSACTIONS; + this.#readyCb(null); } async #readyCb(arg) { - if (this.#state !== ProducerState.CONNECTING && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) { - // I really don't know how to handle this now. - return; - } + if (this.#state !== ProducerState.CONNECTING && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) { + // I really don't know how to handle this now. + return; + } - let config = await this.#config(); - if (config.hasOwnProperty('transactional.id') && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) { - this.#state = ProducerState.INITIALIZING_TRANSACTIONS; - this.#internalClient.initTransactions(5000 /* default: 5s */, this.#readyTransactions.bind(this)); - return; + let config = await this.#config(); + if (config.hasOwnProperty('transactional.id') && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) { + this.#state = ProducerState.INITIALIZING_TRANSACTIONS; + this.#internalClient.initTransactions(5000 /* default: 5s */, this.#readyTransactions.bind(this)); + return; + } + + this.#state = ProducerState.CONNECTED; + + // Start a loop to poll. + let pollInterval = setInterval(() => { + if (this.#state >= ProducerState.DISCONNECTING) { + clearInterval(pollInterval); + return; + } + this.#internalClient.poll(); + }, 500); + + this.#internalClient.on('delivery-report', function (err, report) { + //console.log('got delivery report', report, err); + const opaque = report.opaque; + if (!opaque) { + // not sure how to handle this. + return; + } + if (err) { + opaque.reject('err out'); + return; + } + //console.log('delivery-report: ' + JSON.stringify(report)); + delete report['opaque']; + + const recordMetadata = { + topicName: report.topic, + partition: report.partition, + errorCode: 0, + baseOffset: report.offset, + logAppendTime: null, + logStartOffset: null, } - this.#state = ProducerState.CONNECTED; - - // Start a loop to poll. - let pollInterval = setInterval(() => { - if (this.#state >= ProducerState.DISCONNECTING) { - clearInterval(pollInterval); - return; - } - this.#internalClient.poll(); - }, 500); - - this.#internalClient.on('delivery-report', function(err, report) { - //console.log('got delivery report', report, err); - const opaque = report.opaque; - if (!opaque) { - // not sure how to handle this. - return; - } - if (err) { - opaque.reject('err out'); - return; - } - //console.log('delivery-report: ' + JSON.stringify(report)); - delete report['opaque']; - - const recordMetadata = { - topicName: report.topic, - partition: report.partition, - errorCode: 0, - baseOffset: report.offset, - logAppendTime: null, - logStartOffset: null, - } - - opaque.resolve(recordMetadata); - }); + opaque.resolve(recordMetadata); + }); - // Resolve the promise. - this.#connectPromiseFunc["resolve"](); + // Resolve the promise. + this.#connectPromiseFunc["resolve"](); } #errorCb(args) { - console.log('error', args); - if (this.#state === ProducerState.CONNECTING) { - this.#connectPromiseFunc["reject"](args); - } else { - // do nothing for now. - } + console.log('error', args); + if (this.#state === ProducerState.CONNECTING) { + this.#connectPromiseFunc["reject"](args); + } else { + // do nothing for now. + } } async connect() { - if (this.#state !== ProducerState.INIT) { - return Promise.reject("Connect has already been called elsewhere."); - } + if (this.#state !== ProducerState.INIT) { + return Promise.reject("Connect has already been called elsewhere."); + } - this.#state = ProducerState.CONNECTING; - this.#internalClient = new RdKafka.Producer(await this.#config()); - this.#internalClient.on('ready', this.#readyCb.bind(this)); - this.#internalClient.on('event.error', this.#errorCb.bind(this)); - this.#internalClient.on('event.log', console.log); - - return new Promise((resolve, reject) => { - this.#connectPromiseFunc = {resolve, reject}; - console.log("Connecting...."); - this.#internalClient.connect(); - console.log("connect() called"); - }); + this.#state = ProducerState.CONNECTING; + const { globalConfig, topicConfig } = await this.#config(); + this.#internalClient = new RdKafka.Producer(globalConfig, topicConfig); + this.#internalClient.on('ready', this.#readyCb.bind(this)); + this.#internalClient.on('event.error', this.#errorCb.bind(this)); + this.#internalClient.on('event.log', console.log); + + return new Promise((resolve, reject) => { + this.#connectPromiseFunc = { resolve, reject }; + console.log("Connecting...."); + this.#internalClient.connect(); + console.log("connect() called"); + }); } async disconnect() { @@ -276,67 +281,67 @@ class Producer { } async send(sendOptions) { - if (this.#state !== ProducerState.CONNECTED) { - return Promise.reject("Cannot send message without awaiting connect()"); - } + if (this.#state !== ProducerState.CONNECTED) { + return Promise.reject("Cannot send message without awaiting connect()"); + } - if (sendOptions === null || !(sendOptions instanceof Object)) { - return Promise.reject("sendOptions must be set correctly"); - } + if (sendOptions === null || !(sendOptions instanceof Object)) { + return Promise.reject("sendOptions must be set correctly"); + } - // Ignore all properties except topic and messages. - // TODO: log a warning instead of ignoring. - if (!sendOptions.hasOwnProperty("topic") || !sendOptions.hasOwnProperty("messages") || !Array.isArray(sendOptions["messages"])) { - // TODO: add further validations. - return Promise.reject("sendOptions must be of the form {topic: string, messages: Message[]}"); - } + // Ignore all properties except topic and messages. + // TODO: log a warning instead of ignoring. + if (!sendOptions.hasOwnProperty("topic") || !sendOptions.hasOwnProperty("messages") || !Array.isArray(sendOptions["messages"])) { + // TODO: add further validations. + return Promise.reject("sendOptions must be of the form {topic: string, messages: Message[]}"); + } - const msgPromises = []; - for (let i = 0; i < sendOptions.messages.length; i++) { - const msg = sendOptions.messages[i]; + const msgPromises = []; + for (let i = 0; i < sendOptions.messages.length; i++) { + const msg = sendOptions.messages[i]; - if (!msg.hasOwnProperty("partition") || msg.partition === null) { - msg.partition = -1; - } + if (!msg.hasOwnProperty("partition") || msg.partition === null) { + msg.partition = -1; + } - if (typeof msg.value === 'string') { - msg.value = Buffer.from(msg.value); - } + if (typeof msg.value === 'string') { + msg.value = Buffer.from(msg.value); + } - msgPromises.push(new Promise((resolve, reject) => { - const opaque = {resolve, reject}; - this.#internalClient.produce(sendOptions.topic, msg.partition, msg.value, msg.key, msg.timestamp ?? Date.now(), opaque, msg.headers); - })); + msgPromises.push(new Promise((resolve, reject) => { + const opaque = { resolve, reject }; + this.#internalClient.produce(sendOptions.topic, msg.partition, msg.value, msg.key, msg.timestamp ?? Date.now(), opaque, msg.headers); + })); + } + const recordMetadataArr = await Promise.all(msgPromises); + + const topicPartitionRecordMetadata = new Map(); + for (const recordMetadata of recordMetadataArr) { + const key = `${recordMetadata.topicName},${recordMetadata.partition}`; + if (recordMetadata.baseOffset == null || !topicPartitionRecordMetadata.has(key)) { + topicPartitionRecordMetadata.set(key, recordMetadata); + continue; } - const recordMetadataArr = await Promise.all(msgPromises); - - const topicPartitionRecordMetadata = new Map(); - for (const recordMetadata of recordMetadataArr) { - const key = `${recordMetadata.topicName},${recordMetadata.partition}`; - if (recordMetadata.baseOffset == null || !topicPartitionRecordMetadata.has(key)) { - topicPartitionRecordMetadata.set(key, recordMetadata); - continue; - } - const currentRecordMetadata = topicPartitionRecordMetadata.get(key); + const currentRecordMetadata = topicPartitionRecordMetadata.get(key); - // Don't overwrite a null baseOffset - if (currentRecordMetadata.baseOffset == null) { - continue; - } - - if (currentRecordMetadata.baseOffset > recordMetadata.baseOffset) { - topicPartitionRecordMetadata.set(key, recordMetadata); - } + // Don't overwrite a null baseOffset + if (currentRecordMetadata.baseOffset == null) { + continue; } - const ret = []; - for (const [key, value] of topicPartitionRecordMetadata.entries()) { - value.baseOffset = value.baseOffset?.toString(); - ret.push(value); + if (currentRecordMetadata.baseOffset > recordMetadata.baseOffset) { + topicPartitionRecordMetadata.set(key, recordMetadata); } - return ret; + } + + const ret = []; + for (const [key, value] of topicPartitionRecordMetadata.entries()) { + value.baseOffset = value.baseOffset?.toString(); + ret.push(value); + } + return ret; } } diff --git a/lib/kafkajs/index.js b/lib/kafkajs/index.js index a41822d9..beac115f 100644 --- a/lib/kafkajs/index.js +++ b/lib/kafkajs/index.js @@ -1,3 +1,3 @@ -const { Kafka } = require('./_kafka'); +const { Kafka } = require("./_kafka"); -module.exports = { Kafka } +module.exports = { Kafka }; diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 97e5e506..22e04c62 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -1,4 +1,5 @@ import * as tls from 'tls' +import { ConsumerGlobalConfig, ConsumerTopicConfig, ProducerGlobalConfig, ProducerTopicConfig } from './config' export type BrokersFunction = () => string[] | Promise @@ -37,6 +38,7 @@ export interface ProducerConfig { transactionalId?: string transactionTimeout?: number maxInFlightRequests?: number + rdKafka?: Function | { topicConfig?: ProducerTopicConfig, globalConfig?: ProducerGlobalConfig } } export interface IHeaders { @@ -124,6 +126,7 @@ export interface ConsumerConfig { maxInFlightRequests?: number readUncommitted?: boolean rackId?: string + rdKafka?: Function | { topicConfig?: ConsumerTopicConfig, globalConfig?: ConsumerGlobalConfig } } export type ConsumerEvents = { @@ -409,7 +412,7 @@ export type GroupDescription = { export type Consumer = { connect(): Promise disconnect(): Promise - subscribe(subscription: ConsumerSubscribeTopics ): Promise + subscribe(subscription: ConsumerSubscribeTopics): Promise stop(): Promise run(config?: ConsumerRunConfig): Promise commitOffsets(topicPartitions: Array): Promise