@@ -27,7 +27,7 @@ import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
2727import org .apache .kafka .common .{Cluster , MetricName }
2828import org .apache .kafka .common .metrics ._
2929import org .apache .kafka .common .metrics .Metrics
30- import org .apache .kafka .common .metrics .stats .{Avg , CumulativeSum , Rate }
30+ import org .apache .kafka .common .metrics .stats .{Avg , CumulativeSum , Rate , Value }
3131import org .apache .kafka .common .security .auth .KafkaPrincipal
3232import org .apache .kafka .common .utils .{Sanitizer , Time }
3333import org .apache .kafka .server .quota .{ClientQuotaCallback , ClientQuotaEntity , ClientQuotaType }
@@ -40,7 +40,8 @@ import scala.collection.JavaConverters._
4040 * @param quotaSensor @Sensor that tracks the quota
4141 * @param throttleTimeSensor @Sensor that tracks the throttle time
4242 */
43- case class ClientSensors (metricTags : Map [String , String ], quotaSensor : Sensor , throttleTimeSensor : Sensor )
43+ case class ClientSensors (metricTags : Map [String , String ], quotaSensor : Sensor , throttleTimeSensor : Sensor ,
44+ quotaBoundSensor : Sensor , quotaUtilizationSensor : Sensor )
4445
4546/**
4647 * Configuration settings for quota management
@@ -287,12 +288,23 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
287288 def recordAndGetThrottleTimeMs (session : Session , clientId : String , value : Double , timeMs : Long ): Int = {
288289 var throttleTimeMs = 0
289290 val clientSensors = getOrCreateQuotaSensors(session, clientId)
291+ val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))
292+
293+ // Quotas may 1) not be configured or 2) not be applicable to all metrics,
294+ // so it's necessary to check for the bound's validity beforehand.
295+ Option (clientMetric.config).flatMap(cfg => Option (cfg.quota)).map(_.bound) match {
296+ case Some (quotaBoundVal) =>
297+ clientSensors.quotaBoundSensor.record(quotaBoundVal)
298+ clientSensors.quotaUtilizationSensor.record(
299+ if (quotaBoundVal.isNaN || quotaBoundVal == 0.0 ) 0.0 else value / quotaBoundVal
300+ )
301+ }
302+
290303 try {
291304 clientSensors.quotaSensor.record(value, timeMs)
292305 } catch {
293306 case _ : QuotaViolationException =>
294307 // Compute the delay
295- val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))
296308 throttleTimeMs = throttleTime(clientMetric).toInt
297309 info(" Quota violated for sensor (%s). Delay time: (%d)" .format(clientSensors.quotaSensor.name(), throttleTimeMs))
298310 }
@@ -416,6 +428,20 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
416428 throttleMetricName(metricTags),
417429 None ,
418430 new Avg
431+ ),
432+ sensorAccessor.getOrCreate(
433+ getQuotaBoundSensorName(metricTags),
434+ ClientQuotaManagerConfig .InactiveSensorExpirationTimeSeconds ,
435+ clientRateQuotaBoundMetricName(metricTags),
436+ None ,
437+ new Value
438+ ),
439+ sensorAccessor.getOrCreate(
440+ getQuotaUtilizationSensorName(metricTags),
441+ ClientQuotaManagerConfig .InactiveSensorExpirationTimeSeconds ,
442+ clientRateQuotaUtilizationMetricName(metricTags),
443+ None ,
444+ new Value
419445 )
420446 )
421447 if (quotaCallback.quotaResetRequired(clientQuotaType))
@@ -429,6 +455,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
429455 private def getThrottleTimeSensorName (metricTags : Map [String , String ]): String =
430456 s " ${quotaType}ThrottleTime- ${metricTagsToSensorSuffix(metricTags)}"
431457
458+ private def getQuotaBoundSensorName (metricTags : Map [String , String ]): String =
459+ s " ${quotaType}QuotaBound- ${metricTagsToSensorSuffix(metricTags)}"
460+
461+ private def getQuotaUtilizationSensorName (metricTags : Map [String , String ]): String =
462+ s " ${quotaType}QuotaUtilization- ${metricTagsToSensorSuffix(metricTags)}"
463+
432464 private def getQuotaSensorName (metricTags : Map [String , String ]): String =
433465 s " $quotaType- ${metricTagsToSensorSuffix(metricTags)}"
434466
@@ -561,6 +593,24 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
561593 quotaMetricTags.asJava)
562594 }
563595
596+ private def clientRateQuotaBoundMetricName (quotaMetricTags : Map [String , String ]): MetricName = {
597+ metrics.metricName(
598+ " byte-rate-quota-bound" ,
599+ quotaType.toString,
600+ " Tracking the byte-rate quota bound per user/client-id" ,
601+ quotaMetricTags.asJava
602+ )
603+ }
604+
605+ private def clientRateQuotaUtilizationMetricName (quotaMetricTags : Map [String , String ]): MetricName = {
606+ metrics.metricName(
607+ " byte-rate-quota-utilization" ,
608+ quotaType.toString,
609+ " Tracking the utilization rate of byte-rate quota bound per user/client-id" ,
610+ quotaMetricTags.asJava
611+ )
612+ }
613+
564614 private def throttleMetricName (quotaMetricTags : Map [String , String ]): MetricName = {
565615 metrics.metricName(" throttle-time" ,
566616 quotaType.toString,
0 commit comments