Skip to content

Commit d8fe46d

Browse files
committed
Add topic caching to dependent examples
1 parent ae9fc20 commit d8fe46d

File tree

2 files changed

+51
-15
lines changed

2 files changed

+51
-15
lines changed

examples/kafkajs/admin/dependent_admin.js renamed to examples/kafkajs/admin/dependent-admin.js

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
33

44
async function adminFromConsumer() {
55
const kafka = new Kafka({
6-
kafkaJS: {
7-
brokers: ['localhost:9092'],
8-
}
6+
kafkaJS: {
7+
brokers: ['localhost:9092'],
8+
}
99
});
1010

1111
const consumer = kafka.consumer({
@@ -49,18 +49,17 @@ async function adminFromConsumer() {
4949

5050
async function adminFromProducer() {
5151
const kafka = new Kafka({
52-
kafkaJS: {
53-
brokers: ['localhost:9092'],
54-
}
52+
kafkaJS: {
53+
brokers: ['localhost:9092'],
54+
}
5555
});
5656

57-
const producer = kafka.producer({});
57+
const producer = kafka.producer({
58+
'metadata.max.age.ms': 900000, /* This is set to the default value. */
59+
});
5860

5961
await producer.connect();
6062

61-
// The producer can be used as normal
62-
await producer.send({ topic: 'test-topic', messages: [{ value: 'Hello!' }] });
63-
6463
// And the same producer can create an admin client - the producer must have successfully
6564
// been connected before the admin client can be created.
6665
const admin = producer.dependentAdmin();
@@ -70,6 +69,23 @@ async function adminFromProducer() {
7069
const listTopicsResult = await admin.listTopics();
7170
console.log(listTopicsResult);
7271

72+
// A common use case for the dependent admin client is to make sure the topic
73+
// is cached before producing to it. This avoids delay in sending the first
74+
// message to any topic. Using the admin client linked to the producer allows
75+
// us to do this, by calling `fetchTopicMetadata` before we produce.
76+
// Here, we cache all possible topics, but it's advisable to only cache the
77+
// topics you are going to produce to (if you know it in advance),
78+
// and avoid calling listTopics().
79+
// Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
80+
// which is 15 minutes by default, after which it will be removed if
81+
// it has not been produced to.
82+
await admin.fetchTopicMetadata({ topics: listTopicsResult }).catch(e => {
83+
console.error('Error caching topics: ', e);
84+
})
85+
86+
// The producer can be used as usual.
87+
await producer.send({ topic: 'test-topic', messages: [{ value: 'Hello!' }] });
88+
7389
// Disconnect the producer and admin clients in the correct order.
7490
await admin.disconnect();
7591
await producer.disconnect();

examples/node-rdkafka/dependent_admin.js renamed to examples/node-rdkafka/dependent-admin.js

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ function adminFromProducer() {
99
'dr_msg_cb': true,
1010
});
1111

12-
const createAdminAndListTopics = () => {
12+
const createAdminAndListAndDescribeTopics = (done) => {
1313
// Create an admin client from the producer, which must be connected.
1414
// Thus, this is called from the producer's 'ready' event.
1515
const admin = Kafka.AdminClient.createFrom(producer);
@@ -21,7 +21,26 @@ function adminFromProducer() {
2121
return;
2222
}
2323
console.log("Topics: ", topics);
24-
admin.disconnect();
24+
25+
// A common use case for the dependent admin client is to make sure the topic
26+
// is cached before producing to it. This avoids delay in sending the first
27+
// message to any topic. Using the admin client linked to the producer allows
28+
// us to do this, by calling `describeTopics` before we produce.
29+
// Here, we cache all possible topics, but it's advisable to only cache the
30+
// topics you are going to produce to (if you know it in advance),
31+
// and avoid calling listTopics().
32+
// Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
33+
// which is 15 minutes by default, after which it will be removed if
34+
// it has not been produced to.
35+
admin.describeTopics(topics, null, (err, topicDescriptions) => {
36+
if (err) {
37+
console.error(err);
38+
return;
39+
}
40+
console.log("Topic descriptions fetched successfully");
41+
admin.disconnect();
42+
done();
43+
});
2544
});
2645
};
2746

@@ -32,10 +51,11 @@ function adminFromProducer() {
3251
producer.setPollInterval(100);
3352

3453
// After the producer is ready, it can be used to create an admin client.
35-
createAdminAndListTopics();
54+
createAdminAndListAndDescribeTopics(() => {
55+
// The producer can also be used normally to produce messages.
56+
producer.produce('test-topic', null, Buffer.from('Hello World!'), null, Date.now());
57+
});
3658

37-
// It can also be used normally to produce messages.
38-
producer.produce('test-topic', null, Buffer.from('Hello World!'), null, Date.now());
3959
});
4060

4161
producer.on('event.error', (err) => {

0 commit comments

Comments
 (0)