Skip to content

Commit 6da08dd

Browse files
committed
Forward the UMR inside the ControllerEvent thread
1 parent b1718a4 commit 6da08dd

File tree

3 files changed

+54
-53
lines changed

3 files changed

+54
-53
lines changed

core/src/main/scala/kafka/controller/ControllerState.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,14 @@ object ControllerState {
124124
def value = 20
125125
}
126126

127+
case object ForwardUpdateMetadataRequest extends ControllerState {
128+
def value = 21
129+
}
130+
127131
val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
128132
AlterPartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
129133
LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, TopicUncleanLeaderElectionEnable, ListPartitionReassignment,
130134
UpdateMetadataResponseReceived,
131-
TopicDeletionFlagChange, PreferredControllerChange, TopicMinInSyncReplicasConfigChange, SkipControlledShutdownSafetyCheck)
135+
TopicDeletionFlagChange, PreferredControllerChange, TopicMinInSyncReplicasConfigChange, SkipControlledShutdownSafetyCheck,
136+
ForwardUpdateMetadataRequest)
132137
}

core/src/main/scala/kafka/controller/KafkaController.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import kafka.admin.{AdminOperationException, AdminUtils}
2222
import kafka.api._
2323
import kafka.cluster.Broker
2424
import kafka.common._
25-
import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback}
25+
import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ForwardUpdateMetadataCallback, ListReassignmentsCallback}
2626
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
2727
import kafka.server._
2828
import kafka.utils._
2929
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
3030
import kafka.zk._
3131
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
32-
import org.apache.kafka.common.{ElectionType, KafkaException, TopicPartition, Node}
32+
import org.apache.kafka.common.{ElectionType, KafkaException, Node, TopicPartition}
3333
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, NotEnoughReplicasException, PolicyViolationException, StaleBrokerEpochException}
3434
import org.apache.kafka.common.message.UpdateMetadataResponseData
3535
import org.apache.kafka.common.metrics.Metrics
@@ -57,6 +57,7 @@ object KafkaController extends Logging {
5757
type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit
5858
type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit
5959
type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit
60+
type ForwardUpdateMetadataCallback = UpdateMetadataResponse => Unit
6061

6162
def satisfiesLiCreateTopicPolicy(createTopicPolicy : Option[CreateTopicPolicy], zkClient : KafkaZkClient,
6263
topic : String, partitionsAssignment : collection.Map[Int, ReplicaAssignment]): Boolean = {
@@ -419,6 +420,10 @@ class KafkaController(val config: KafkaConfig,
419420
zkClient.updateBrokerInfo(newBrokerInfo)
420421
}
421422

423+
private[kafka] def forwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback): Unit = {
424+
eventManager.put(ForwardUpdateMetadataRequest(umr, callback))
425+
}
426+
422427
private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
423428
eventManager.put(UncleanLeaderElectionEnable)
424429
}
@@ -1458,6 +1463,26 @@ class KafkaController(val config: KafkaConfig,
14581463
controllerContext.skipShutdownSafetyCheck += (id -> brokerEpoch)
14591464
}
14601465

1466+
def processForwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback): Unit = {
1467+
if (!isActive) {
1468+
throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
1469+
}
1470+
1471+
info(s"controller for clusterId=${clusterId} has received a remote, non-rewritten UpdateMetadataRequest "
1472+
+ s"(clusterId=${umr.originClusterId}, routingClusterId=${umr.routingClusterId}): about to validate and rewrite it")
1473+
1474+
// Inside KafkaApis, we've already validated that
1475+
// 1. the originClusterId is not equal to my local cluster Id
1476+
// 2. the routingClusterId is null
1477+
umr.rewriteRemoteRequest(clusterId, config.brokerId,
1478+
controllerContext.epoch, controllerContext.maxBrokerEpoch)
1479+
val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
1480+
sendRemoteUpdateMetadataRequest(liveBrokers, umr)
1481+
1482+
// For now, we always return a successful UpdateMetadataResponse
1483+
callback(new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code)))
1484+
}
1485+
14611486
private def safeToShutdown(id: Int, brokerEpoch: Long): Boolean = {
14621487
// First, check whether or not the broker requesting shutdown has already been told that it is OK to shut down
14631488
// at this epoch.
@@ -2383,6 +2408,8 @@ class KafkaController(val config: KafkaConfig,
23832408
processStartup()
23842409
case SkipControlledShutdownSafetyCheck(id, brokerEpoch, callback) =>
23852410
processSkipControlledShutdownSafetyCheck(id, brokerEpoch, callback)
2411+
case ForwardUpdateMetadataRequest(umr, callback) =>
2412+
processForwardUpdateMetadataRequest(umr, callback)
23862413
}
23872414
} catch {
23882415
case e: ControllerMovedException =>
@@ -2589,6 +2616,10 @@ case class SkipControlledShutdownSafetyCheck(id: Int, brokerEpoch: Long, skipCon
25892616
def state: ControllerState.SkipControlledShutdownSafetyCheck.type = ControllerState.SkipControlledShutdownSafetyCheck
25902617
}
25912618

2619+
case class ForwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback) extends ControllerEvent {
2620+
def state = ControllerState.ForwardUpdateMetadataRequest
2621+
}
2622+
25922623
case class LeaderAndIsrResponseReceived(leaderAndIsrResponse: LeaderAndIsrResponse, brokerId: Int) extends ControllerEvent {
25932624
def state = ControllerState.LeaderAndIsrResponseReceived
25942625
}

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
102102
val quotas: QuotaManagers,
103103
val fetchManager: FetchManager,
104104
brokerTopicStats: BrokerTopicStats,
105-
val clusterId: String, // GRR FIXME: any guarantee this is non-null?
105+
val clusterId: String,
106106
time: Time,
107107
val tokenManager: DelegationTokenManager) extends Logging {
108108

@@ -132,8 +132,6 @@ class KafkaApis(val requestChannel: RequestChannel,
132132
try {
133133
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
134134
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
135-
// info(s"GRR DEBUG (TEMPORARY): Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
136-
// s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
137135
request.header.apiKey match {
138136
case ApiKeys.PRODUCE => handleProduceRequest(request)
139137
case ApiKeys.FETCH => handleFetchRequest(request)
@@ -222,7 +220,6 @@ class KafkaApis(val requestChannel: RequestChannel,
222220
}
223221
}
224222

225-
// FIXME? request arg is NOT USED
226223
private def doHandleLeaderAndIsrRequest(request: RequestChannel.Request, correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest): LeaderAndIsrResponse = {
227224
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]): Unit = {
228225
// for each new leader or follower, call coordinator to handle consumer group migration.
@@ -265,7 +262,6 @@ class KafkaApis(val requestChannel: RequestChannel,
265262
}
266263
}
267264

268-
// FIXME? request arg is NOT USED
269265
private def doHandleStopReplicaRequest(request: RequestChannel.Request, stopReplicaRequest: StopReplicaRequest): StopReplicaResponse = {
270266
val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
271267
// Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
@@ -297,14 +293,16 @@ class KafkaApis(val requestChannel: RequestChannel,
297293
val updateMetadataRequest = request.body[UpdateMetadataRequest]
298294

299295
authorizeClusterOperation(request, CLUSTER_ACTION)
300-
// GRR FIXME: not 100% clear whether staleness criterion should apply to updates coming from other physical
301-
// clusters, but based on KIP-380 description, seems like we probably do need it in order to deal with same
302-
// problems KIP-380 was intended to solve (i.e., "cluster A" controller bounce around same time as "cluster
303-
// B" remote UpdateMetadataRequest); also implies that local controllers must track brokerEpochs of remote
304-
// controllers
305-
// (separate question is why check isn't needed by LI's combined control request, which skips directly to
306-
// doHandleUpdateMetadataRequest(): probable BUG)
307-
if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.maxBrokerEpoch)) {
296+
if (shouldForwardUpdateMetadataRequest(updateMetadataRequest)) {
297+
// The metadata propagation follows an eventual consistency model, which means
298+
// an UpdateMetadataRequest is not specific to a particular broker, or to a particular broker epoch.
299+
// For example, if a newly restarted broker accepts an UpdateMetadataRequest intended for its previous
300+
// epoch, there won't be any correctness violations.
301+
// For UpdateMetadataRequests from foreign clusters, there is no need to check the brokerEpoch or maxBrokerEpoch
302+
controller.forwardUpdateMetadataRequest(updateMetadataRequest, updateMetadataResponse => {
303+
sendResponseExemptThrottle(request, updateMetadataResponse)
304+
})
305+
} else if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.maxBrokerEpoch)) {
308306
// When the broker restarts very quickly, it is possible for this broker to receive request intended
309307
// for its previous generation so the broker should skip the stale request.
310308
info("Received update metadata request with stale broker epoch info " +
@@ -318,24 +316,12 @@ class KafkaApis(val requestChannel: RequestChannel,
318316
}
319317
}
320318

319+
private def shouldForwardUpdateMetadataRequest(updateMetadataRequest: UpdateMetadataRequest): Boolean = {
320+
config.liFederationEnable && clusterId.equals(updateMetadataRequest.originClusterId()) && updateMetadataRequest.routingClusterId() == null
321+
}
322+
321323
// [unlike the other "doHandle..." methods, this one DOES use the request arg]
322324
private def doHandleUpdateMetadataRequest(request: RequestChannel.Request, correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): UpdateMetadataResponse = {
323-
324-
// Since handleLiCombinedControlRequest() calls us directly (bypassing handleUpdateMetadataRequest() and its
325-
// stale broker-epoch check), this seems like the most appropriate place for the new federation "router" to
326-
// live: rest of (original) method is the legacy "broker half" logic.
327-
if (!config.liFederationEnable || clusterId.equals(updateMetadataRequest.originClusterId) || clusterId.equals(updateMetadataRequest.routingClusterId)) {
328-
// This is either a local/legacy/non-federated request (from our ZK => originClusterId matches) or one our controller
329-
// has already rewritten (received from a remote controller => routingClusterId matches), so do the normal,
330-
// broker-half processing below.
331-
// info(s"GRR DEBUG: brokerId=${brokerId} received updateMetadataRequest: controllerId=${updateMetadataRequest.controllerId}, originClusterId=${updateMetadataRequest.originClusterId}, routingClusterId=${updateMetadataRequest.routingClusterId}")
332-
if (updateMetadataRequest.originClusterId != null && clusterId.equals(updateMetadataRequest.routingClusterId)) {
333-
info(s"GRR DEBUG: local brokerId=${brokerId} in clusterId=${clusterId} received rewritten updateMetadataRequest from remote clusterId=${updateMetadataRequest.originClusterId}")
334-
}
335-
// [The following block is NOT properly indented in order to simplify upstream merges.]
336-
337-
338-
info(s"GRR DEBUG: brokerId=${brokerId} calling maybeUpdateMetadataCache() with correlationId=${correlationId} and updateMetadataRequest from clusterId=${updateMetadataRequest.originClusterId}")
339325
val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
340326
if (deletedPartitions.nonEmpty)
341327
groupCoordinator.handleDeletedPartitions(deletedPartitions)
@@ -361,17 +347,6 @@ class KafkaApis(val requestChannel: RequestChannel,
361347
}
362348
}
363349
new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code))
364-
365-
366-
367-
} else {
368-
// [Federation only.] This is an incoming remote request (i.e., from another physical cluster in the federation),
369-
// so hand it off to our controller half for validation, rewriting, and rerouting.
370-
info(s"GRR DEBUG: local brokerId=${brokerId} in clusterId=${clusterId} received new updateMetadataRequest from remote controllerId=${updateMetadataRequest.controllerId} in clusterId=${updateMetadataRequest.originClusterId}; sending to controller for validation and rewrite")
371-
controller.rewriteAndForwardRemoteUpdateMetadataRequest(updateMetadataRequest) // modifies UMR in place, returns response
372-
// same method ^^^ stuffs the rewritten UMR into the processing queue, which lives in controller's
373-
// ControllerEventManager (KafkaController's eventManager member var)
374-
}
375350
}
376351

377352
def handleControlledShutdownRequest(request: RequestChannel.Request): Unit = {
@@ -3159,11 +3134,6 @@ class KafkaApis(val requestChannel: RequestChannel,
31593134
val responseData = new LiCombinedControlResponseData()
31603135

31613136
decomposedRequest.leaderAndIsrRequest match {
3162-
// GRR FIXME: this code path skips the "stale broker epoch" check that the main path (through
3163-
// handleLeaderAndIsrRequest()) makes: BUG? (seems like it) (maybe intention was to add single
3164-
// stale-epoch check in handleLiCombinedControlRequest(), but forgot to do so?)
3165-
// [separate question: why was LAIR's top-level BrokerEpoch moved into LCCR's LeaderAndIsrPartitionState
3166-
// struct? why is MaxBrokerEpoch missing? is LCCR out of date?]
31673137
case Some(leaderAndIsrRequest) => {
31683138
val leaderAndIsrResponse = doHandleLeaderAndIsrRequest(request, correlationId, leaderAndIsrRequest)
31693139
responseData.setLeaderAndIsrErrorCode(leaderAndIsrResponse.errorCode())
@@ -3173,9 +3143,6 @@ class KafkaApis(val requestChannel: RequestChannel,
31733143
}
31743144

31753145
decomposedRequest.updateMetadataRequest match {
3176-
// GRR FIXME: this code path skips the "stale broker epoch" check that the main path (through
3177-
// handleUpdateMetadataRequest()) makes: BUG? (seems like it)
3178-
// [separate question: why was UMR's top-level BrokerEpoch not copied to LCCR?]
31793146
case Some(updateMetadataRequest) => {
31803147
val updateMetadataResponse = doHandleUpdateMetadataRequest(request, correlationId, updateMetadataRequest)
31813148
responseData.setUpdateMetadataErrorCode(updateMetadataResponse.errorCode())
@@ -3186,8 +3153,6 @@ class KafkaApis(val requestChannel: RequestChannel,
31863153
val stopReplicaRequests = decomposedRequest.stopReplicaRequests
31873154
val stopReplicaPartitionErrors = new util.ArrayList[StopReplicaPartitionError]()
31883155
stopReplicaRequests.foreach{ stopReplicaRequest => {
3189-
// GRR FIXME: this code path skips the "stale broker epoch" check that the main path (through
3190-
// handleStopReplicaRequest()) makes: BUG? (seems like it)
31913156
val stopReplicaResponse = doHandleStopReplicaRequest(request, stopReplicaRequest)
31923157
responseData.setStopReplicaErrorCode(stopReplicaResponse.errorCode())
31933158
stopReplicaPartitionErrors.addAll(LiCombinedControlTransformer.transformStopReplicaPartitionErrors(stopReplicaResponse.partitionErrors()))

0 commit comments

Comments
 (0)