diff --git a/CHANGELOG.md b/CHANGELOG.md
index dfbf4087..52d2ce76 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ v0.5.0 is a limited availability feature release. It is supported for all usage.
 1. Add support for an Admin API to delete records.(#141).
 2. Fixes an issue with unresolved raced Promises leaking in the consumer (#151).
 3. Add support for an Admin API to describe topics.(#155).
+4. Add support for dependent Admin client (#153).
 
 
 # confluent-kafka-javascript v0.4.0
diff --git a/e2e/admin-dependent.spec.js b/e2e/admin-dependent.spec.js
new file mode 100644
index 00000000..f3cd5c6c
--- /dev/null
+++ b/e2e/admin-dependent.spec.js
@@ -0,0 +1,83 @@
+/*
+ * confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
+ *
+ * Copyright (c) 2024 Confluent, Inc.
+ *
+ * This software may be modified and distributed under the terms
+ * of the MIT license.  See the LICENSE.txt file for details.
+ */
+
+var Kafka = require('../');
+var t = require('assert');
+
+var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
+var time = Date.now();
+
+describe('Dependent Admin', function () {
+  describe('from Producer', function () {
+    let producer;
+
+    this.beforeEach(function (done) {
+      producer = new Kafka.Producer({
+        'metadata.broker.list': kafkaBrokerList,
+      });
+      done();
+    });
+
+    it('should be created and useable from connected producer', function (done) {
+      producer.on('ready', function () {
+        let admin = Kafka.AdminClient.createFrom(producer);
+        admin.listTopics(null, function (err, res) {
+          t.ifError(err);
+          t.ok(res);
+          producer.disconnect(done);
+          admin = null;
+        });
+        t.ok(admin);
+      });
+      producer.connect();
+    });
+
+    it('should fail to be created from unconnected producer', function (done) {
+      t.throws(function () {
+        Kafka.AdminClient.createFrom(producer);
+      }, /Existing client must be connected before creating a new client from it/);
+      done();
+    });
+
+  });
+
+  describe('from Consumer', function () {
+    let consumer;
+
+    this.beforeEach(function (done) {
+      consumer = new Kafka.KafkaConsumer({
+        'metadata.broker.list': kafkaBrokerList,
+        'group.id': 'kafka-mocha-grp-' + time,
+      });
+      done();
+    });
+
+    it('should be created and useable from connected consumer', function (done) {
+      consumer.on('ready', function () {
+        let admin = Kafka.AdminClient.createFrom(consumer);
+        admin.listTopics(null, function (err, res) {
+          t.ifError(err);
+          t.ok(res);
+          consumer.disconnect(done);
+          admin = null;
+        });
+        t.ok(admin);
+      });
+      consumer.connect();
+    });
+
+    it('should fail to be created from unconnected consumer', function (done) {
+      t.throws(function () {
+        Kafka.AdminClient.createFrom(consumer);
+      }, /Existing client must be connected before creating a new client from it/);
+      done();
+    });
+
+  });
+});
diff --git a/examples/kafkajs/admin/dependent-admin.js b/examples/kafkajs/admin/dependent-admin.js
new file mode 100644
index 00000000..91875821
--- /dev/null
+++ b/examples/kafkajs/admin/dependent-admin.js
@@ -0,0 +1,94 @@
+// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
+const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
+
+async function adminFromConsumer() {
+    const kafka = new Kafka({
+        kafkaJS: {
+            brokers: ['localhost:9092'],
+        }
+    });
+
+    const consumer = kafka.consumer({
+        kafkaJS: {
+            groupId: 'test-group',
+            fromBeginning: true,
+        }
+    });
+
+    await consumer.connect();
+
+    // The consumer can be used as normal
+    await consumer.subscribe({ topic: 'test-topic' });
+    consumer.run({
+        eachMessage: async ({ topic, partition, message }) => {
+            console.log({
+                topic,
+                partition,
+                offset: message.offset,
+                key: message.key?.toString(),
+                value: message.value.toString(),
+            });
+        },
+    });
+
+    // And the same consumer can create an admin client - the consumer must have successfully
+    // been connected before the admin client can be created.
+    const admin = consumer.dependentAdmin();
+    await admin.connect();
+
+    // The admin client can be used until the consumer is connected.
+    const listTopicsResult = await admin.listTopics();
+    console.log(listTopicsResult);
+
+    await new Promise(resolve => setTimeout(resolve, 10000));
+
+    // Disconnect the consumer and admin clients in the correct order.
+    await admin.disconnect();
+    await consumer.disconnect();
+}
+
+async function adminFromProducer() {
+    const kafka = new Kafka({
+        kafkaJS: {
+            brokers: ['localhost:9092'],
+        }
+    });
+
+    const producer = kafka.producer({
+        'metadata.max.age.ms': 900000, /* This is set to the default value. */
+    });
+
+    await producer.connect();
+
+    // And the same producer can create an admin client - the producer must have successfully
+    // been connected before the admin client can be created.
+    const admin = producer.dependentAdmin();
+    await admin.connect();
+
+    // The admin client can be used until the producer is connected.
+    const listTopicsResult = await admin.listTopics();
+    console.log(listTopicsResult);
+
+    // A common use case for the dependent admin client is to make sure the topic
+    // is cached before producing to it. This avoids delay in sending the first
+    // message to any topic. Using the admin client linked to the producer allows
+    // us to do this, by calling `fetchTopicMetadata` before we produce.
+    // Here, we cache all possible topics, but it's advisable to only cache the
+    // topics you are going to produce to (if you know it in advance),
+    // and avoid calling listTopics().
+    // Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
+    // which is 15 minutes by default, after which it will be removed if
+    // it has not been produced to.
+    await admin.fetchTopicMetadata({ topics: listTopicsResult }).catch(e => {
+        console.error('Error caching topics: ', e);
+    })
+
+    // The producer can be used as usual.
+    await producer.send({ topic: 'test-topic', messages: [{ value: 'Hello!' }] });
+
+    // Disconnect the producer and admin clients in the correct order.
+    await admin.disconnect();
+    await producer.disconnect();
+}
+
+adminFromProducer().then(() => adminFromConsumer()).catch(console.error);
diff --git a/examples/node-rdkafka/dependent-admin.js b/examples/node-rdkafka/dependent-admin.js
new file mode 100644
index 00000000..4aee994a
--- /dev/null
+++ b/examples/node-rdkafka/dependent-admin.js
@@ -0,0 +1,120 @@
+const Kafka = require('@confluentinc/kafka-javascript');
+const admin = require('../../lib/admin');
+
+const bootstrapServers = 'localhost:9092';
+
+function adminFromProducer(callback) {
+    const producer = new Kafka.Producer({
+        'bootstrap.servers': bootstrapServers,
+        'dr_msg_cb': true,
+    });
+
+    const createAdminAndListAndDescribeTopics = (done) => {
+        // Create an admin client from the producer, which must be connected.
+        // Thus, this is called from the producer's 'ready' event.
+        const admin = Kafka.AdminClient.createFrom(producer);
+
+        // The admin client can be used until the producer is connected.
+        admin.listTopics((err, topics) => {
+            if (err) {
+                console.error(err);
+                return;
+            }
+            console.log("Topics: ", topics);
+
+            // A common use case for the dependent admin client is to make sure the topic
+            // is cached before producing to it. This avoids delay in sending the first
+            // message to any topic. Using the admin client linked to the producer allows
+            // us to do this, by calling `describeTopics` before we produce.
+            // Here, we cache all possible topics, but it's advisable to only cache the
+            // topics you are going to produce to (if you know it in advance),
+            // and avoid calling listTopics().
+            // Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
+            // which is 15 minutes by default, after which it will be removed if
+            // it has not been produced to.
+            admin.describeTopics(topics, null, (err, topicDescriptions) => {
+                if (err) {
+                    console.error(err);
+                    return;
+                }
+                console.log("Topic descriptions fetched successfully");
+                admin.disconnect();
+                done();
+            });
+        });
+    };
+
+    producer.connect();
+
+    producer.on('ready', () => {
+        console.log("Producer is ready");
+        producer.setPollInterval(100);
+
+        // After the producer is ready, it can be used to create an admin client.
+        createAdminAndListAndDescribeTopics(() => {
+            // The producer can also be used normally to produce messages.
+            producer.produce('test-topic', null, Buffer.from('Hello World!'), null, Date.now());
+        });
+
+    });
+
+    producer.on('event.error', (err) => {
+        console.error(err);
+        producer.disconnect(callback);
+    });
+
+    producer.on('delivery-report', (err, report) => {
+        console.log("Delivery report received:", report);
+        producer.disconnect(callback);
+    });
+}
+
+function adminFromConsumer() {
+    const consumer = new Kafka.KafkaConsumer({
+        'bootstrap.servers': bootstrapServers,
+        'group.id': 'test-group',
+        'auto.offset.reset': 'earliest',
+    });
+
+    const createAdminAndListTopics = () => {
+        // Create an admin client from the consumer, which must be connected.
+        // Thus, this is called from the consumer's 'ready' event.
+        const admin = Kafka.AdminClient.createFrom(consumer);
+
+        // The admin client can be used until the consumer is connected.
+        admin.listTopics((err, topics) => {
+            if (err) {
+                console.error(err);
+                return;
+            }
+            console.log("Topics: ", topics);
+            admin.disconnect();
+        });
+    };
+
+    consumer.connect();
+
+    consumer.on('ready', () => {
+        console.log("Consumer is ready");
+
+        // After the consumer is ready, it can be used to create an admin client.
+        createAdminAndListTopics();
+
+        // It can also be used normally to consume messages.
+        consumer.subscribe(['test-topic']);
+        consumer.consume();
+    });
+
+    consumer.on('data', (data) => {
+        // Quit after receiving a message.
+        console.log("Consumer:data", data);
+        consumer.disconnect();
+    });
+
+    consumer.on('event.error', (err) => {
+        console.error("Consumer:error", err);
+        consumer.disconnect();
+    });
+}
+
+adminFromProducer(() => adminFromConsumer());
diff --git a/lib/admin.js b/lib/admin.js
index a180add7..24e38f21 100644
--- a/lib/admin.js
+++ b/lib/admin.js
@@ -40,6 +40,7 @@ const AclOperationTypes = Object.seal({
 
 module.exports = {
   create: createAdminClient,
+  createFrom: createAdminClientFrom,
   ConsumerGroupStates,
   AclOperationTypes,
 };
@@ -78,6 +79,33 @@ function createAdminClient(conf, eventHandlers) {
   return client;
 }
 
+/**
+ * Create a new AdminClient from an existing producer or consumer.
+ *
+ * This is a factory method because it immediately starts an
+ * active handle with the brokers.
+ *
+ * The producer or consumer being used must be connected.
+ * The client can only be used while the producer or consumer is connected.
+ * Logging and other events from this client will be emitted on the producer or consumer.
+ * @param {import('../types/rdkafka').Producer | import('../types/rdkafka').KafkaConsumer} existingClient a producer or consumer to create the admin client from
+ * @param {object} eventHandlers optional key value pairs of event handlers to attach to the client
+ */
+function createAdminClientFrom(existingClient, eventHandlers) {
+  var client = new AdminClient(null, existingClient);
+
+  if (eventHandlers && typeof eventHandlers === 'object') {
+    for (const key in eventHandlers) {
+      client.on(key, eventHandlers[key]);
+    }
+  }
+
+  LibrdKafkaError.wrap(client.connect(), true);
+
+  // Return the client if we succeeded
+  return client;
+}
+
 /**
  * AdminClient class for administering Kafka
  *
@@ -86,7 +114,8 @@ function createAdminClient(conf, eventHandlers) {
  * should be made using the factory method.
  *
  * 
- * var client = AdminClient.create({ ... });
+ * var client = AdminClient.create({ ... }); // From configuration
+ * var client = AdminClient.createFrom(existingClient); // From existing producer or consumer
  * 
  *
  * Once you instantiate this object, it will have a handle to the kafka broker.
@@ -96,31 +125,27 @@ function createAdminClient(conf, eventHandlers) {
  *
  * @param {object} conf - Key value pairs to configure the admin client
  * topic configuration
+ * @param {import('../types/rdkafka').Producer | import('../types/rdkafka').KafkaConsumer | null} existingClient
  * @constructor
  */
-function AdminClient(conf) {
+function AdminClient(conf, existingClient) {
   if (!(this instanceof AdminClient)) {
     return new AdminClient(conf);
   }
 
-  conf = shallowCopy(conf);
+  if (conf) {
+    conf = shallowCopy(conf);
+  }
 
-  /**
-   * NewTopic model.
-   *
-   * This is the representation of a new message that is requested to be made
-   * using the Admin client.
-   *
-   * @typedef {object} AdminClient~NewTopic
-   * @property {string} topic - the topic name to create
-   * @property {number} num_partitions - the number of partitions to give the topic
-   * @property {number} replication_factor - the replication factor of the topic
-   * @property {object} config - a list of key values to be passed as configuration
-   * for the topic.
-   */
+  Client.call(this, conf, Kafka.AdminClient, null, existingClient);
 
-  Client.call(this, conf, Kafka.AdminClient);
-  this._isConnected = false;
+  if (existingClient) {
+    this._isConnected = true;
+    this._hasUnderlyingClient = true;
+  } else {
+    this._isConnected = false;
+    this._hasUnderlyingClient = false;
+  }
   this.globalConfig = conf;
 }
 
@@ -133,8 +158,12 @@ function AdminClient(conf) {
  * Unlike the other connect methods, this one is synchronous.
  */
 AdminClient.prototype.connect = function () {
-  this._client.configureCallbacks(true, this._cb_configs);
-  LibrdKafkaError.wrap(this._client.connect(), true);
+  if (!this._hasUnderlyingClient) {
+    this._client.configureCallbacks(true, this._cb_configs);
+    LibrdKafkaError.wrap(this._client.connect(), true);
+  }
+  // While this could be a no-op for an existing client, we still emit the event
+  // to have a consistent API.
   this._isConnected = true;
   this.emit('ready', { name: this._client.name() });
 };
@@ -146,11 +175,16 @@ AdminClient.prototype.connect = function () {
  * some memory and shut some threads down
  */
 AdminClient.prototype.disconnect = function () {
+  if (this._hasUnderlyingClient) {
+    // no-op if we're from an existing client, we're just reusing the handle.
+    return;
+  }
+
   LibrdKafkaError.wrap(this._client.disconnect(), true);
-  this._isConnected = false;
   // The AdminClient doesn't provide a callback. So we can't
   // wait for completion.
   this._client.configureCallbacks(false, this._cb_configs);
+  this._isConnected = false;
 };
 
 /**
diff --git a/lib/client.js b/lib/client.js
index 84f976cf..2ce92c07 100644
--- a/lib/client.js
+++ b/lib/client.js
@@ -31,12 +31,43 @@ util.inherits(Client, Emitter);
  * @param {function} SubClientType - The function representing the subclient
  * type. In C++ land this needs to be a class that inherits from Connection.
  * @param {object} topicConf - Topic configuration in key value pairs
+ * @param {object} existingClient - a producer or a consumer to derive this client from.
+ *                                  only used by the AdminClient. Must be connected.
  * @constructor
  * @extends Emitter
  */
-function Client(globalConf, SubClientType, topicConf) {
+function Client(globalConf, SubClientType, topicConf, existingClient) {
   if (!(this instanceof Client)) {
-    return new Client(globalConf, SubClientType, topicConf);
+    return new Client(globalConf, SubClientType, topicConf, existingClient);
+  }
+
+  // Throw an error early - this allows us to set confs to {} and avoid all
+  // the null-checking in case of existingClient being set.
+  if (!existingClient && !globalConf) {
+    throw new Error('Global configuration data must be specified');
+  }
+
+  if (existingClient && globalConf) {
+    throw new Error('Global configuration data must not be specified when creating a client from an existing client');
+  }
+
+  if (existingClient && topicConf) {
+    throw new Error('Topic configuration data must not be specified when creating a client from an existing client');
+  }
+
+  if (existingClient && !(existingClient._client instanceof Kafka.Producer) && !(existingClient._client instanceof Kafka.KafkaConsumer)) {
+    throw new Error('Existing client must be a Producer or Consumer instance');
+  }
+
+  if (existingClient && !existingClient._isConnected) {
+    throw new Error('Existing client must be connected before creating a new client from it');
+  }
+
+  let existingInternalClient;
+  if (existingClient) {
+    globalConf = {};
+    topicConf = {};
+    existingInternalClient = existingClient.getClient();
   }
 
   Emitter.call(this);
@@ -58,7 +89,7 @@ function Client(globalConf, SubClientType, topicConf) {
   globalConf['client.software.name'] = 'confluent-kafka-javascript';
   globalConf['client.software.version'] = `${bindingVersion}-librdkafka-${Kafka.librdkafkaVersion}`;
 
-  this._client = new SubClientType(globalConf, topicConf);
+  this._client = new SubClientType(globalConf, topicConf, existingInternalClient);
 
   // We should not modify the globalConf object. We have cloned it already.
   delete globalConf['client.software.name'];
@@ -80,7 +111,7 @@ function Client(globalConf, SubClientType, topicConf) {
     event: {},
   };
 
-  if (!no_event_cb) {
+  if (!existingClient && !no_event_cb) {
     this._cb_configs.event.event_cb = function(eventType, eventData) {
       switch (eventType) {
         case 'error':
@@ -193,6 +224,13 @@ function Client(globalConf, SubClientType, topicConf) {
  * @return {Client} - Returns itself.
  */
 Client.prototype.connect = function(metadataOptions, cb) {
+  if (this._hasUnderlyingClient) {
+    // This is a derived client. We don't want to connect it, it's already connected.
+    // No one should be reaching this method in the first place if they use the
+    // API correctly, but it is possible to do so accidentally.
+    throw new Error('Cannot connect an existing client');
+  }
+
   var self = this;
 
   var next = function(err, data) {
@@ -346,6 +384,14 @@ Client.prototype.getLastError = function() {
  * @return {function} - Callback to call when disconnection is complete.
  */
 Client.prototype.disconnect = function(cb) {
+  if (this._hasUnderlyingClient) {
+    // This is a derived client.
+    // We don't want to disconnect it as it's controlled by the underlying client.
+    // No one should be reaching this method in the first place if they use the
+    // API correctly, but it is possible to do so accidentally.
+    throw new Error('Cannot disconnect an existing client');
+  }
+
   var self = this;
 
   if (!this._isDisconnecting && this._client) {
diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js
index 78c8b64f..99527c53 100644
--- a/lib/kafkajs/_admin.js
+++ b/lib/kafkajs/_admin.js
@@ -67,6 +67,11 @@ class Admin {
    */
   #clientName = undefined;
 
+  /**
+   * The existing client to use as basis for this admin client, if any is provided.
+   */
+  #existingClient = null;
+
   /**
    * Convenience function to create the metadata object needed for logging.
    */
@@ -76,10 +81,12 @@ class Admin {
 
   /**
    * @constructor
-   * @param {import("../../types/kafkajs").AdminConstructorConfig} config
+   * @param {import("../../types/kafkajs").AdminConstructorConfig?} config
+   * @param {import("../../types/kafkajs").Client?} existingClient
    */
-  constructor(config) {
+  constructor(config, existingClient) {
     this.#userConfig = config;
+    this.#existingClient = existingClient;
   }
 
   #config() {
@@ -178,17 +185,27 @@ class Admin {
 
     this.#state = AdminState.CONNECTING;
 
-    const config = this.#config();
-
     return new Promise((resolve, reject) => {
       try {
         /* AdminClient creation is a synchronous operation for node-rdkafka */
         this.#connectPromiseFunc = { resolve, reject };
-        this.#internalClient = RdKafka.AdminClient.create(config, {
-          'error': this.#errorCb.bind(this),
-          'ready': this.#readyCb.bind(this),
-          'event.log': (msg) => loggerTrampoline(msg, this.#logger),
-        });
+        if (!this.#existingClient) {
+          const config = this.#config();
+          this.#internalClient = RdKafka.AdminClient.create(config, {
+            'error': this.#errorCb.bind(this),
+            'ready': this.#readyCb.bind(this),
+            'event.log': (msg) => loggerTrampoline(msg, this.#logger),
+          });
+        } else {
+          const underlyingClient = this.#existingClient._getInternalClient();
+          if (!underlyingClient) {
+            throw new error.KafkaJSError("Underlying client is not connected.", { code: error.ErrorCodes.ERR__STATE });
+          }
+          this.#logger = this.#existingClient.logger();
+          this.#internalClient = RdKafka.AdminClient.createFrom(underlyingClient, {
+            'ready': this.#readyCb.bind(this),
+          });
+        }
 
         this.#clientName = this.#internalClient.name;
         this.#logger.info("Admin client connected", this.#createAdminBindingMessageMetadata());
diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js
index b772a104..eb7c678c 100644
--- a/lib/kafkajs/_consumer.js
+++ b/lib/kafkajs/_consumer.js
@@ -1,4 +1,5 @@
 const LibrdKafkaError = require('../error');
+const { Admin } = require('./_admin');
 const error = require('./_error');
 const RdKafka = require('../rdkafka');
 const {
@@ -242,6 +243,26 @@ class Consumer {
     this.#userConfig = kJSConfig;
   }
 
+  /**
+   * @returns {import("../rdkafka").Consumer | null} the internal node-rdkafka client.
+   * @note only for internal use and subject to API changes.
+   */
+  _getInternalClient() {
+    return this.#internalClient;
+  }
+
+  /**
+   * Create a new admin client using the underlying connections of the consumer.
+   *
+   * The consumer must be connected before connecting the resulting admin client.
+   * The usage of the admin client is limited to the lifetime of the consumer.
+   * The consumer's logger is shared with the admin client.
+   * @returns {import("../../types/kafkajs").Admin}
+   */
+  dependentAdmin() {
+    return new Admin(null, this);
+  }
+
   #config() {
     if (!this.#internalConfig)
       this.#internalConfig = this.#finalizedConfig();
@@ -985,13 +1006,6 @@ class Consumer {
     return ret;
   }
 
-  /**
-   * @returns {import("../rdkafka").Consumer} the internal node-rdkafka client.
-   */
-  _getInternalConsumer() {
-    return this.#internalClient;
-  }
-
   /**
    * Set up the client and connect to the bootstrap brokers.
    * @returns {Promise} a promise that resolves when the consumer is connected.
diff --git a/lib/kafkajs/_producer.js b/lib/kafkajs/_producer.js
index f2542f39..89d7eeed 100644
--- a/lib/kafkajs/_producer.js
+++ b/lib/kafkajs/_producer.js
@@ -1,4 +1,5 @@
 const RdKafka = require('../rdkafka');
+const { Admin } = require('./_admin');
 const { kafkaJSToRdKafkaConfig,
   topicPartitionOffsetToRdKafka,
   createKafkaJsErrorFromLibRdKafkaError,
@@ -83,6 +84,26 @@ class Producer {
     this.#userConfig = kJSConfig;
   }
 
+  /**
+   * @returns {import("../rdkafka").Producer | null} the internal node-rdkafka client.
+   * @note only for internal use and subject to API changes.
+   */
+  _getInternalClient() {
+    return this.#internalClient;
+  }
+
+  /**
+   * Create a new admin client using the underlying connections of the producer.
+   *
+   * The producer must be connected before connecting the resulting admin client.
+   * The usage of the admin client is limited to the lifetime of the producer.
+   * The producer's logger is shared with the admin client.
+   * @returns {import("../../types/kafkajs").Admin}
+   */
+  dependentAdmin() {
+    return new Admin(null, this);
+  }
+
   /**
    * The client name used by the producer for logging - determined by librdkafka
    * using a combination of clientId and an integer.
diff --git a/src/admin.cc b/src/admin.cc
index 082d8b1a..51eadbb2 100644
--- a/src/admin.cc
+++ b/src/admin.cc
@@ -23,17 +23,16 @@ namespace NodeKafka {
 /**
  * @brief AdminClient v8 wrapped object.
  *
- * Specializes the connection to wrap a consumer object through compositional
+ * Specializes the connection to wrap a producer object through compositional
  * inheritence. Establishes its prototype in node through `Init`
  *
  * @sa RdKafka::Handle
  * @sa NodeKafka::Client
  */
 
-AdminClient::AdminClient(Conf* gconfig):
-  Connection(gconfig, NULL) {
-    rkqu = NULL;
-}
+AdminClient::AdminClient(Conf *gconfig) : Connection(gconfig, NULL) {}
+
+AdminClient::AdminClient(Connection *connection) : Connection(connection) {}
 
 AdminClient::~AdminClient() {
   Disconnect();
@@ -44,6 +43,14 @@ Baton AdminClient::Connect() {
     return Baton(RdKafka::ERR_NO_ERROR);
   }
 
+  /* We should never fail the IsConnected check when we have an underlying
+   * client, as it should always be connected. */
+  if (m_has_underlying) {
+    return Baton(RdKafka::ERR__STATE,
+                 "Existing client is not connected, and dependent client "
+                 "cannot initiate connection.");
+  }
+
   Baton baton = setupSaslOAuthBearerConfig();
   if (baton.err() != RdKafka::ERR_NO_ERROR) {
     return baton;
@@ -68,10 +75,6 @@ Baton AdminClient::Connect() {
   /* Set the client name at the first possible opportunity for logging. */
   m_event_cb.dispatcher.SetClientName(m_client->name());
 
-  if (rkqu == NULL) {
-    rkqu = rd_kafka_queue_new(m_client->c_ptr());
-  }
-
   baton = setupSaslOAuthBearerBackgroundQueue();
   if (baton.err() != RdKafka::ERR_NO_ERROR) {
     DeactivateDispatchers();
@@ -81,14 +84,16 @@ Baton AdminClient::Connect() {
 }
 
 Baton AdminClient::Disconnect() {
+  /* Dependent AdminClients don't need to do anything. We block the call to
+   * disconnect in JavaScript, but the destructor of AdminClient might trigger
+   * this call. */
+  if (m_has_underlying) {
+    return Baton(RdKafka::ERR_NO_ERROR);
+  }
+
   if (IsConnected()) {
     scoped_shared_write_lock lock(m_connection_lock);
 
-    if (rkqu != NULL) {
-      rd_kafka_queue_destroy(rkqu);
-      rkqu = NULL;
-    }
-
     DeactivateDispatchers();
 
     delete m_client;
@@ -145,25 +150,39 @@ void AdminClient::New(const Nan::FunctionCallbackInfo& info) {
   }
 
   if (info.Length() < 1) {
-    return Nan::ThrowError("You must supply a global configuration");
+    return Nan::ThrowError("You must supply a global configuration or a preexisting client"); // NOLINT
   }
 
-  if (!info[0]->IsObject()) {
-    return Nan::ThrowError("Global configuration data must be specified");
-  }
+  Connection *connection = NULL;
+  Conf *gconfig = NULL;
+  AdminClient *client = NULL;
 
-  std::string errstr;
+  if (info.Length() >= 3 && !info[2]->IsNull() && !info[2]->IsUndefined()) {
+    if (!info[2]->IsObject()) {
+      return Nan::ThrowError("Third argument, if provided, must be a client object"); // NOLINT
+    }
+    // We check whether this is a wrapped object within the calling JavaScript
+    // code, so it's safe to unwrap it here. We Unwrap it directly into a
+    // Connection object, since it's OK to unwrap into the parent class.
+    connection = ObjectWrap::Unwrap(
+        info[2]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
+    client = new AdminClient(connection);
+  } else {
+    if (!info[0]->IsObject()) {
+      return Nan::ThrowError("Global configuration data must be specified");
+    }
 
-  Conf* gconfig =
-    Conf::create(RdKafka::Conf::CONF_GLOBAL,
-      (info[0]->ToObject(Nan::GetCurrentContext())).ToLocalChecked(), errstr);
+    std::string errstr;
+    gconfig = Conf::create(
+        RdKafka::Conf::CONF_GLOBAL,
+        (info[0]->ToObject(Nan::GetCurrentContext())).ToLocalChecked(), errstr);
 
-  if (!gconfig) {
-    return Nan::ThrowError(errstr.c_str());
+    if (!gconfig) {
+      return Nan::ThrowError(errstr.c_str());
+    }
+    client = new AdminClient(gconfig);
   }
 
-  AdminClient* client = new AdminClient(gconfig);
-
   // Wrap it
   client->Wrap(info.This());
 
diff --git a/src/admin.h b/src/admin.h
index 4a498067..12103cdc 100644
--- a/src/admin.h
+++ b/src/admin.h
@@ -77,9 +77,10 @@ class AdminClient : public Connection {
   static void New(const Nan::FunctionCallbackInfo& info);
 
   explicit AdminClient(Conf* globalConfig);
+  explicit AdminClient(Connection* existingConnection);
   ~AdminClient();
 
-  rd_kafka_queue_t* rkqu;
+  bool is_derived = false;
 
  private:
   // Node methods
diff --git a/src/connection.cc b/src/connection.cc
index 833c34f2..189c10f1 100644
--- a/src/connection.cc
+++ b/src/connection.cc
@@ -57,9 +57,32 @@ Connection::Connection(Conf* gconfig, Conf* tconfig):
     m_gconfig->set("event_cb", &m_event_cb, errstr);
   }
 
+/* Use an existing Connection object as the underlying for this object.
+ * At this point, the underlying connection is assumed to be connected with
+ * the m_client set. */
+Connection::Connection(Connection *existing):
+  m_event_cb() {
+    m_client = existing->m_client;
+
+    m_gconfig = existing->m_gconfig;
+    m_tconfig = existing->m_tconfig;
+
+    m_is_closing = false;
+    m_has_underlying = true;
+
+    // We must share the same connection lock as the existing connection to
+    // avoid getting disconnected while the existing connection is still in use.
+    m_connection_lock = existing->m_connection_lock;
+  }
+
+
 Connection::~Connection() {
-  uv_rwlock_destroy(&m_connection_lock);
+  // The underlying connection will take care of cleanup.
+  if (m_has_underlying) {
+    return;
+  }
 
+  uv_rwlock_destroy(&m_connection_lock);
   if (m_tconfig) {
     delete m_tconfig;
   }
@@ -254,6 +277,9 @@ Baton Connection::GetMetadata(
     err = RdKafka::ERR__STATE;
   }
 
+  if (topic != NULL)
+    delete topic;
+
   if (err == RdKafka::ERR_NO_ERROR) {
     return Baton(metadata);
   } else {
diff --git a/src/connection.h b/src/connection.h
index c798814b..532468fe 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -79,6 +79,7 @@ class Connection : public Nan::ObjectWrap {
 
  protected:
   Connection(Conf*, Conf*);
+  explicit Connection(Connection *);
   ~Connection();
 
   static Nan::Persistent constructor;
@@ -88,7 +89,6 @@ class Connection : public Nan::ObjectWrap {
   Baton setupSaslOAuthBearerConfig();
   Baton setupSaslOAuthBearerBackgroundQueue();
 
-  bool m_has_been_disconnected;
   bool m_is_closing;
 
   Conf* m_gconfig;
@@ -96,6 +96,7 @@ class Connection : public Nan::ObjectWrap {
   std::string m_errstr;
 
   uv_rwlock_t m_connection_lock;
+  bool m_has_underlying = false;
 
   RdKafka::Handle* m_client;
 
diff --git a/test/promisified/admin/dependent_client.spec.js b/test/promisified/admin/dependent_client.spec.js
new file mode 100644
index 00000000..85039eeb
--- /dev/null
+++ b/test/promisified/admin/dependent_client.spec.js
@@ -0,0 +1,51 @@
+jest.setTimeout(30000);
+
+const {
+    secureRandom,
+    createProducer,
+    createConsumer,
+} = require('../testhelpers');
+
+describe.each(["producer", "consumer"])('Dependent admin client (%s)', (dependentOn) => {
+    let admin, underlyingClient;
+
+    beforeEach(async () => {
+        if (dependentOn === "producer") {
+            underlyingClient = createProducer({});
+        } else {
+            underlyingClient = createConsumer({ groupId: `test-group-${secureRandom()}` });
+        }
+        admin = underlyingClient.dependentAdmin();
+    });
+
+    afterEach(async () => {
+        admin && (await admin.disconnect());
+        underlyingClient && (await underlyingClient.disconnect());
+    });
+
+    it('should connect and work for connected underlying client', async () => {
+        await underlyingClient.connect();
+        await admin.connect();
+
+        const listTopicsResult = await admin.listTopics();
+        expect(listTopicsResult).toBeInstanceOf(Array);
+    });
+
+    it('should not connect for unconnected underlying client', async () => {
+        await expect(admin.connect()).rejects.toHaveProperty('message', 'Underlying client is not connected.');
+
+        underlyingClient = null; // prevents disconnect call
+        admin = null; // prevents disconnect call
+    });
+
+    it('should not connect for disconnected underlying client', async () => {
+        await underlyingClient.connect();
+        await underlyingClient.disconnect();
+
+        await expect(admin.connect()).rejects.toHaveProperty('message', 'Existing client must be connected before creating a new client from it');
+
+        underlyingClient = null; // prevents disconnect call
+        admin = null; // prevents disconnect call
+    });
+});
+
diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts
index 0e7fb44c..4a129926 100644
--- a/types/kafkajs.d.ts
+++ b/types/kafkajs.d.ts
@@ -99,6 +99,7 @@ type Client = {
   disconnect(): Promise
   logger(): Logger
   setSaslCredentialProvider(authInfo: { username: string, password: string }): void
+  dependentAdmin(): Admin
 }
 
 export enum CompressionTypes {
@@ -331,7 +332,7 @@ export interface OffsetsByTopicPartition {
   topics: TopicOffsets[]
 }
 
-export type FetchOffsetsPartition = PartitionOffset & { metadata: string | null, leaderEpoch: number | null, error?: LibrdKafkaError };  
+export type FetchOffsetsPartition = PartitionOffset & { metadata: string | null, leaderEpoch: number | null, error?: LibrdKafkaError };
 
 export type TopicInput = string[] | { topic: string; partitions: number[] }[]
 
@@ -403,11 +404,11 @@ export type Admin = {
     groups: string[],
     options?: { timeout?: number, includeAuthorizedOperations?: boolean }): Promise
   deleteGroups(groupIds: string[], options?: { timeout?: number }): Promise
-  fetchOffsets(options: { 
+  fetchOffsets(options: {
     groupId: string,
     topics?: TopicInput,
     timeout?: number,
-    requireStableOffsets?: boolean }): 
+    requireStableOffsets?: boolean }):
     Promise>
   deleteTopicRecords(options: {
     topic: string; partitions: SeekEntry[];
diff --git a/types/rdkafka.d.ts b/types/rdkafka.d.ts
index 88d5f0eb..165786ea 100644
--- a/types/rdkafka.d.ts
+++ b/types/rdkafka.d.ts
@@ -503,6 +503,7 @@ export type EventHandlers = {
 
 export abstract class AdminClient {
     static create(conf: GlobalConfig, eventHandlers?: EventHandlers): IAdminClient;
+    static createFrom(existingClient: Producer | KafkaConsumer, eventHandlers?: EventHandlers): IAdminClient;
 }
 
 export type RdKafka = {