Skip to content

Commit 1da4ae1

Browse files
committed
Update comments and and examples
1 parent d959733 commit 1da4ae1

File tree

5 files changed

+195
-2
lines changed

5 files changed

+195
-2
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
2+
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
3+
4+
async function adminFromConsumer() {
5+
const kafka = new Kafka({
6+
kafkaJS: {
7+
brokers: ['localhost:9092'],
8+
}
9+
});
10+
11+
const consumer = kafka.consumer({
12+
kafkaJS: {
13+
groupId: 'test-group',
14+
fromBeginning: true,
15+
}
16+
});
17+
18+
await consumer.connect();
19+
20+
// The consumer can be used as normal
21+
await consumer.subscribe({ topic: 'test-topic' });
22+
consumer.run({
23+
eachMessage: async ({ topic, partition, message }) => {
24+
console.log({
25+
topic,
26+
partition,
27+
offset: message.offset,
28+
key: message.key?.toString(),
29+
value: message.value.toString(),
30+
});
31+
},
32+
});
33+
34+
// And the same consumer can create an admin client - the consumer must have successfully
35+
// been connected before the admin client can be created.
36+
const admin = consumer.dependentAdmin();
37+
await admin.connect();
38+
39+
// The admin client can be used until the consumer is connected.
40+
const listTopicsResult = await admin.listTopics();
41+
console.log(listTopicsResult);
42+
43+
await new Promise(resolve => setTimeout(resolve, 10000));
44+
45+
// Disconnect the consumer and admin clients in the correct order.
46+
await admin.disconnect();
47+
await consumer.disconnect();
48+
}
49+
50+
async function adminFromProducer() {
51+
const kafka = new Kafka({
52+
kafkaJS: {
53+
brokers: ['localhost:9092'],
54+
}
55+
});
56+
57+
const producer = kafka.producer({});
58+
59+
await producer.connect();
60+
61+
// The producer can be used as normal
62+
await producer.send({ topic: 'test-topic', messages: [{ value: 'Hello!' }] });
63+
64+
// And the same producer can create an admin client - the producer must have successfully
65+
// been connected before the admin client can be created.
66+
const admin = producer.dependentAdmin();
67+
await admin.connect();
68+
69+
// The admin client can be used until the producer is connected.
70+
const listTopicsResult = await admin.listTopics();
71+
console.log(listTopicsResult);
72+
73+
// Disconnect the producer and admin clients in the correct order.
74+
await admin.disconnect();
75+
await producer.disconnect();
76+
}
77+
78+
adminFromProducer().then(() => adminFromConsumer()).catch(console.error);
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
const Kafka = require('@confluentinc/kafka-javascript');
2+
const admin = require('../../lib/admin');
3+
4+
const bootstrapServers = 'localhost:9092';
5+
6+
function adminFromProducer() {
7+
const producer = new Kafka.Producer({
8+
'bootstrap.servers': bootstrapServers,
9+
'dr_msg_cb': true,
10+
});
11+
12+
const createAdminAndListTopics = () => {
13+
// Create an admin client from the producer, which must be connected.
14+
// Thus, this is called from the producer's 'ready' event.
15+
const admin = Kafka.AdminClient.createFrom(producer);
16+
17+
// The admin client can be used until the producer is connected.
18+
admin.listTopics((err, topics) => {
19+
if (err) {
20+
console.error(err);
21+
return;
22+
}
23+
console.log("Topics: ", topics);
24+
admin.disconnect();
25+
});
26+
};
27+
28+
producer.connect();
29+
30+
producer.on('ready', () => {
31+
console.log("Producer is ready");
32+
producer.setPollInterval(100);
33+
34+
// After the producer is ready, it can be used to create an admin client.
35+
createAdminAndListTopics();
36+
37+
// It can also be used normally to produce messages.
38+
producer.produce('test-topic', null, Buffer.from('Hello World!'), null, Date.now());
39+
});
40+
41+
producer.on('event.error', (err) => {
42+
console.error(err);
43+
});
44+
45+
producer.on('delivery-report', (err, report) => {
46+
console.log("Delivery report received:", report);
47+
});
48+
49+
setTimeout(() => {
50+
producer.disconnect();
51+
}, 30000);
52+
}
53+
54+
function adminFromConsumer() {
55+
const consumer = new Kafka.KafkaConsumer({
56+
'bootstrap.servers': bootstrapServers,
57+
'group.id': 'test-group',
58+
'auto.offset.reset': 'earliest',
59+
});
60+
61+
const createAdminAndListTopics = () => {
62+
// Create an admin client from the consumer, which must be connected.
63+
// Thus, this is called from the consumer's 'ready' event.
64+
const admin = Kafka.AdminClient.createFrom(consumer);
65+
66+
// The admin client can be used until the consumer is connected.
67+
admin.listTopics((err, topics) => {
68+
if (err) {
69+
console.error(err);
70+
return;
71+
}
72+
console.log("Topics: ", topics);
73+
admin.disconnect();
74+
});
75+
};
76+
77+
consumer.connect();
78+
79+
consumer.on('ready', () => {
80+
console.log("Consumer is ready");
81+
82+
// After the consumer is ready, it can be used to create an admin client.
83+
createAdminAndListTopics();
84+
85+
// It can also be used normally to consume messages.
86+
consumer.subscribe(['test-topic']);
87+
consumer.consume();
88+
});
89+
90+
consumer.on('data', (data) => {
91+
console.log("Consumer:data", data);
92+
});
93+
94+
consumer.on('event.error', (err) => {
95+
console.error("Consumer:error", err);
96+
});
97+
98+
setTimeout(() => {
99+
consumer.disconnect();
100+
}, 30000);
101+
}
102+
103+
adminFromProducer();
104+
setTimeout(() => adminFromConsumer(), 35000);

lib/admin.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,11 @@ function createAdminClient(conf, eventHandlers) {
8484
*
8585
* This is a factory method because it immediately starts an
8686
* active handle with the brokers.
87+
*
88+
* The producer or consumer being used must be connected.
89+
* The client can only be used while the producer or consumer is connected.
90+
* Logging and other events from this client will be emitted on the producer or consumer.
8791
* @param {import('../types/rdkafka').Producer | import('../types/rdkafka').KafkaConsumer} existingClient a producer or consumer to create the admin client from
88-
* It must be connected.
8992
* @param {object} eventHandlers optional key value pairs of event handlers to attach to the client
9093
*/
9194
function createAdminClientFrom(existingClient, eventHandlers) {

lib/kafkajs/_consumer.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ class Consumer {
247247

248248
/**
249249
* Create a new admin client using the underlying connections of the consumer.
250+
*
251+
* The consumer must be connected before connecting the resulting admin client.
252+
* The usage of the admin client is limited to the lifetime of the consumer.
253+
* The consumer's logger is shared with the admin client.
250254
* @returns {import("../../types/kafkajs").Admin}
251255
*/
252256
dependentAdmin() {

lib/kafkajs/_producer.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ class Producer {
9393
}
9494

9595
/**
96-
* Create a new admin client using the underlying connections of the consumer.
96+
* Create a new admin client using the underlying connections of the producer.
97+
*
98+
* The producer must be connected before connecting the resulting admin client.
99+
* The usage of the admin client is limited to the lifetime of the producer.
100+
* The producer's logger is shared with the admin client.
97101
* @returns {import("../../types/kafkajs").Admin}
98102
*/
99103
dependentAdmin() {

0 commit comments

Comments
 (0)