diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index f191659bae746..cacb8175fbfd6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -315,7 +315,7 @@ public synchronized void update(int requestVersion, MetadataResponse response, l String newClusterId = cache.cluster().clusterResource().clusterId(); if (!Objects.equals(previousClusterId, newClusterId)) { - log.info("Cluster ID: {}", newClusterId); + log.info("Cluster ID = {}", newClusterId); } clusterResourceListeners.onUpdate(cache.cluster().clusterResource()); diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 3c86c6f000e79..fb4d055326a2f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -1232,6 +1232,7 @@ private void handleCompletedReceives(List responses, long now) { InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); + // FIXME: probable perf concern: cache isTraceEnabled() outside loop, used cached boolean here if (log.isTraceEnabled()) { log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination, req.header.apiKey(), req.header.correlationId(), responseStruct); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 2795cfc25eb5e..d0c3abfd81a1d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.LinkedList; import java.util.Map; import static java.util.Collections.singletonList; @@ -50,16 +51,19 @@ public class UpdateMetadataRequest extends AbstractControlRequest { public static class Builder extends AbstractControlRequest.Builder { private final List partitionStates; private final List liveBrokers; + private final String originClusterId; private Lock buildLock = new ReentrantLock(); // LIKAFKA-18349 - Cache the UpdateMetadataRequest Objects to reduce memory usage private final Map requestCache = new HashMap<>(); public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, long maxBrokerEpoch, - List partitionStates, List liveBrokers) { + List partitionStates, List liveBrokers, + String originClusterId) { super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch, maxBrokerEpoch); this.partitionStates = partitionStates; this.liveBrokers = liveBrokers; + this.originClusterId = originClusterId; } @Override @@ -104,6 +108,12 @@ public UpdateMetadataRequest build(short version) { data.setUngroupedPartitionStates(partitionStates); } + // originClusterId == null implies federation is not enabled (though reverse may not be true); will be + // ignored during serialization (data.toStruct()) + if (version >= 7) { + data.setOriginClusterId(originClusterId); + } + updateMetadataRequest = new UpdateMetadataRequest(data, version); requestCache.put(version, updateMetadataRequest); } @@ -158,6 +168,32 @@ public List liveBrokers() { } } + /** + * Dummy "builder" that simply wraps an already-built UpdateMetadataRequest. This is needed in order to + * support submission of rewritten remote requests (i.e., from controllers in other physical clusters in + * a federated setup) to the broker-queues in this controller's cluster. + */ + public static class WrappingBuilder extends Builder { + private final UpdateMetadataRequest updateMetadataRequest; + + public WrappingBuilder(UpdateMetadataRequest umr) { + super(umr.version(), umr.controllerId(), umr.controllerEpoch(), umr.brokerEpoch(), umr.maxBrokerEpoch(), + toList(umr.partitionStates()), umr.liveBrokers(), umr.originClusterId()); + this.updateMetadataRequest = umr; + } + + @Override + public UpdateMetadataRequest build(short version) { + return updateMetadataRequest; + } + + private static List toList(Iterable iterable) { + List list = new LinkedList<>(); + iterable.forEach(list::add); + return list; + } + } + private final UpdateMetadataRequestData data; // LIKAFKA-18349 - Cache the UpdateMetadataRequest struct to reduce memory usage private Struct struct = null; @@ -214,6 +250,16 @@ public UpdateMetadataRequest(Struct struct, short version) { this(new UpdateMetadataRequestData(struct, version), version); } + // federation + public String originClusterId() { + return data.originClusterId(); + } + + // federation + public String routingClusterId() { + return data.routingClusterId(); + } + @Override public int controllerId() { return data.controllerId(); @@ -283,6 +329,26 @@ protected Struct toStruct() { } } + // federation + public void rewriteRemoteRequest(String routingClusterId, int controllerId, int controllerEpoch, long maxBrokerEpoch) { + // FIXME? should we add a version check for 7+? federation should not be enabled with less than that... + structLock.lock(); + try { + data.setRoutingClusterId(routingClusterId); + data.setControllerId(controllerId); + data.setControllerEpoch(controllerEpoch); + // brokerEpoch apparently gets rewritten by the controller for every receiving broker (somewhere...) + // before sending it to them: shouldn't need to mess with it here, right? or should we remove it in + // the version >= 6 case? FIXME? + if (version() >= 6) { + data.setMaxBrokerEpoch(maxBrokerEpoch); + } + struct = null; // invalidate cache (in case it's there) + } finally { + structLock.unlock(); + } + } + // Visible for testing UpdateMetadataRequestData data() { return data; diff --git a/clients/src/main/resources/common/message/UpdateMetadataRequest.json b/clients/src/main/resources/common/message/UpdateMetadataRequest.json index 19ed06184b98d..1c04c84ab059e 100644 --- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json +++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json @@ -33,6 +33,12 @@ "validVersions": "0-7", "flexibleVersions": "7+", "fields": [ + { "name": "OriginClusterId", "type": "string", "versions": "7+", "nullableVersions": "7+", "default": "null", + "taggedVersions": "7+", "tag": 0, "ignorable": true, + "about": "The clusterId if known. In federated clusters, this is the ID of the originating physical cluster, i.e., it matches the included broker info." }, + { "name": "RoutingClusterId", "type": "string", "versions": "7+", "nullableVersions": "7+", "default": "null", + "taggedVersions": "7+", "tag": 1, "ignorable": true, + "about": "The 'effective' or rewritten clusterId; for routing purposes, has precedence over OriginClusterId if present. In federated clusters, updates from other physical clusters must be modified by the local controller and then forwarded to local brokers, including to the broker half of the controller itself. This field allows the local controller to avoid infinite loops." }, { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The controller id." }, { "name": "ControllerEpoch", "type": "int32", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 0b2012e160585..36434ac124b2e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1355,7 +1355,7 @@ private UpdateMetadataRequest createUpdateMetadataRequest(int version, String ra .setRack(rack) ); return new UpdateMetadataRequest.Builder((short) version, 1, 10, 0, 0, partitionStates, - liveBrokers).build(); + liveBrokers, "dummyClusterId").build(); } private UpdateMetadataResponse createUpdateMetadataResponse() { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java index 4758936bfd97c..cac849036e598 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java @@ -53,7 +53,7 @@ public class UpdateMetadataRequestTest { public void testUnsupportedVersion() { UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder( (short) (UPDATE_METADATA.latestVersion() + 1), 0, 0, 0, 0, - Collections.emptyList(), Collections.emptyList()); + Collections.emptyList(), Collections.emptyList(), "dummyClusterId"); assertThrows(UnsupportedVersionException.class, builder::build); } @@ -61,7 +61,7 @@ public void testUnsupportedVersion() { public void testGetErrorResponse() { for (short version = UPDATE_METADATA.oldestVersion(); version < UPDATE_METADATA.latestVersion(); version++) { UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder( - version, 0, 0, 0, 0, Collections.emptyList(), Collections.emptyList()); + version, 0, 0, 0, 0, Collections.emptyList(), Collections.emptyList(), "dummyClusterId"); UpdateMetadataRequest request = builder.build(); UpdateMetadataResponse response = request.getErrorResponse(0, new ClusterAuthorizationException("Not authorized")); @@ -149,7 +149,7 @@ public void testVersionLogic() { ); UpdateMetadataRequest request = new UpdateMetadataRequest.Builder(version, 1, 2, 3, 3, - partitionStates, liveBrokers).build(); + partitionStates, liveBrokers, "dummyClusterId").build(); assertEquals(new HashSet<>(partitionStates), iterableToSet(request.partitionStates())); assertEquals(liveBrokers, request.liveBrokers()); @@ -201,7 +201,7 @@ public void testTopicPartitionGroupingSizeReduction() { .setPartitionIndex(tp.partition())); } UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder((short) 5, 0, 0, 0, 0, - partitionStates, Collections.emptyList()); + partitionStates, Collections.emptyList(), "dummyClusterId"); assertTrue(MessageTestUtil.messageSize(builder.build((short) 5).data(), (short) 5) < MessageTestUtil.messageSize(builder.build((short) 4).data(), (short) 4)); diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 31174ef259330..86c205750d7d1 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition} import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import scala.collection.{Seq, Map, Set, mutable} object ControllerChannelManager { @@ -55,6 +55,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, import ControllerChannelManager._ protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] + protected val remoteControllerIds = new HashSet[Int] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " val brokerResponseSensors: mutable.Map[ApiKeys, BrokerResponseTimeStats] = mutable.HashMap.empty @@ -71,6 +72,9 @@ class ControllerChannelManager(controllerContext: ControllerContext, controllerContext.liveOrShuttingDownBrokers.foreach(addNewBroker) brokerLock synchronized { + info(s"About to iterate brokerStateInfo to start RequestSendThreads " + + s"(${brokerStateInfo.size - remoteControllerIds.size} local brokers, ${remoteControllerIds.size} " + + "remote controllers)") brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1)) } initBrokerResponseSensors() @@ -109,6 +113,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, } } + // [non-federation only] def addBroker(broker: Broker): Unit = { // be careful here. Maybe the startup() API has already started the request send thread brokerLock synchronized { @@ -119,19 +124,64 @@ class ControllerChannelManager(controllerContext: ControllerContext, } } + // [non-federation only] def removeBroker(brokerId: Int): Unit = { brokerLock synchronized { removeExistingBroker(brokerStateInfo(brokerId)) } } + /** + * [Federation only] Get the Node struct (basic connection details) for the specified local broker ID. + * This is sent to a remote controller so it can, in turn, send its cluster updates to us. + */ + // [might be a test-only method; will depend on how "real" configuration/discovery/recovery works out] + private[controller] def getBrokerNode(brokerId: Int): Option[Node] = { + brokerLock synchronized { + val stateInfoOpt = brokerStateInfo.get(brokerId) + stateInfoOpt match { + case Some(stateInfo) => + val node = stateInfo.brokerNode + info(s"Controller ${config.brokerId}'s Node info for brokerId=${brokerId} = ${node}") + Some(node) + case None => + info(s"ControllerBrokerStateInfo on controllerId=${config.brokerId} for brokerId=${brokerId} DOES NOT EXIST ('offline'?)") + None + } + } + } + + /** + * [Federation only] Add the specified broker as a remote controller, i.e., a target for local + * metadata updates but not for rewritten remote ones. Loosely speaking, this is the other side + * of getBrokerNode(), i.e., this is what the other side does when it receives the getBrokerNode() + * info from another controller. + */ + // [might be a test-only method; will depend on how "real" configuration/discovery/recovery works out] + private[controller] def addRemoteController(remoteBroker: Broker): Unit = { + info(s"ControllerId=${config.brokerId} adding remote controller [${remoteBroker}] for FEDERATION INTER-CLUSTER REQUESTS and starting its RequestSendThread") + brokerLock synchronized { + if (!remoteControllerIds.contains(remoteBroker.id)) { + addNewBroker(remoteBroker, false) + startRequestSendThread(remoteBroker.id) + } + } + } + + // called under brokerLock except at startup() private def addNewBroker(broker: Broker): Unit = { + addNewBroker(broker, true) + } + + // called under brokerLock except at startup() + private def addNewBroker(broker: Broker, isLocal: Boolean): Unit = { val messageQueue = new LinkedBlockingQueue[QueueItem] debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}") val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) val brokerNode = broker.node(controllerToBrokerListenerName) - val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ") + val idType = if (isLocal) "targetBrokerId" else "remoteControllerId" + val logContext = new LogContext(s"[Controller id=${config.brokerId}, ${idType}=${brokerNode.idString}] ") val (networkClient, reconfigurableChannelBuilder) = { val channelBuilder = ChannelBuilders.clientChannelBuilder( controllerToBrokerSecurityProtocol, @@ -198,6 +248,10 @@ class ControllerChannelManager(controllerContext: ControllerContext, brokerMetricTags(broker.id) ) + if (!isLocal) { + info(s"Adding remote ${brokerNode} info to brokerStateInfo map for federation inter-cluster requests") + remoteControllerIds.add(broker.id) + } brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder)) } @@ -247,7 +301,7 @@ class RequestSendThread(val controllerId: Int, val controllerChannelManager: ControllerChannelManager) extends ShutdownableThread(name = name) with KafkaMetricsGroup { - logIdent = s"[RequestSendThread controllerId=$controllerId] " + logIdent = s"[RequestSendThread controllerId=$controllerId -> brokerId=${brokerNode.id}] " private val MaxRequestAgeMetricName = "maxRequestAge" @@ -446,11 +500,12 @@ extends ShutdownableThread(name = name) with KafkaMetricsGroup { } class ControllerBrokerRequestBatch(config: KafkaConfig, + clusterId: String, controllerChannelManager: ControllerChannelManager, controllerEventManager: ControllerEventManager, controllerContext: ControllerContext, stateChangeLogger: StateChangeLogger) - extends AbstractControllerBrokerRequestBatch(config, controllerContext, stateChangeLogger) { + extends AbstractControllerBrokerRequestBatch(config, clusterId, controllerContext, stateChangeLogger) { def sendEvent(event: ControllerEvent): Unit = { controllerEventManager.put(event) @@ -467,6 +522,7 @@ class ControllerBrokerRequestBatch(config: KafkaConfig, case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean) abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, + val originClusterId: String, controllerContext: ControllerContext, stateChangeLogger: StateChangeLogger) extends Logging { val controllerId: Int = config.brokerId @@ -653,11 +709,14 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, }.toBuffer if (updateMetadataRequestVersion >= 6) { - // We should only create one copy UpdateMetadataRequest that should apply to all brokers. + // NOTE: new flexible versions thing is for 7+ (which we don't check here), but UpdateMetadataRequest.Builder + // does check for it before attempting to call data.setClusterId(clusterId) + // We should create only one copy of UpdateMetadataRequest[.Builder] that should apply to all brokers. // The goal is to reduce memory footprint on the controller. val maxBrokerEpoch = controllerContext.maxBrokerEpoch val updateMetadataRequest = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, - AbstractControlRequest.UNKNOWN_BROKER_EPOCH, maxBrokerEpoch, partitionStates.asJava, liveBrokers.asJava) + AbstractControlRequest.UNKNOWN_BROKER_EPOCH, maxBrokerEpoch, partitionStates.asJava, liveBrokers.asJava, + originClusterId) updateMetadataRequestBrokerSet.intersect(controllerContext.liveOrShuttingDownBrokerIds).foreach { broker => sendRequest(broker, updateMetadataRequest, (r: AbstractResponse) => { @@ -665,17 +724,44 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, broker)) }) } + + // if we're part of a multi-cluster federation, we need to send our (local) updates to controllers in the + // other physical clusters + if (config.liFederationEnable) { + // [note confusing variable names: "broker" = brokerId, "updateMetadataRequest" = updateMetadataRequestBuilder] + // FIXME: need to keep list of remote (active) controllers up to date + // - implies some kind of configuration pointing at the remote ZKs (or all ZKs, from which we subtract + // our own) + // - implies some kind of ZK-watcher setup + callback to maintain the list in realtime (potentially like + // updateMetadataRequestBrokerSet above, which filters out IDs < 0, but could also tweak state info + // to include "isRemoteController" and "isActive" states and filter on latter) + // FIXME: the sendRequest() calls to remote controllers below need some kind of reasonable timeout/retry setup + // (since we probably don't know about shutting-down states, etc., of remote controllers...or would our + // ZK-watcher get that for free?): what's reasonable here? and if we exhaust retries (or avoid retrying), + // do we have some kind of "deferred update" list like elsewhere in the code, or ...? + // (all of this could be wrapped up in a method call to elsewhere, but not clear where would be best) + controllerContext.getLiveOrShuttingDownRemoteControllerIds.foreach { remoteControllerId => + info(s"Local controllerId=${config.brokerId} sending updateMetadataRequest to remote controllerId=${remoteControllerId}") + sendRequest(remoteControllerId, updateMetadataRequest, (r: AbstractResponse) => { + val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse] + sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, remoteControllerId)) + }) + } + } + } else { updateMetadataRequestBrokerSet.intersect(controllerContext.liveOrShuttingDownBrokerIds).foreach { broker => val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker) val updateMetadataRequest = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, - brokerEpoch, AbstractControlRequest.UNKNOWN_BROKER_EPOCH, partitionStates.asJava, liveBrokers.asJava) + brokerEpoch, AbstractControlRequest.UNKNOWN_BROKER_EPOCH, partitionStates.asJava, liveBrokers.asJava, + originClusterId) sendRequest(broker, updateMetadataRequest, (r: AbstractResponse) => { val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse] sendEvent(UpdateMetadataResponseReceived(updateMetadataResponse, broker)) }) } } + updateMetadataRequestBrokerSet.clear() updateMetadataRequestPartitionInfoMap.clear() } @@ -747,7 +833,33 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, throw new IllegalStateException(e) } } -} + + /** + * [Federation only] Send the topic-partition metadata from a remote physical cluster to the specified local + * brokers (only) so they can correctly respond to metadata requests for the entire federation. + * + * @param brokers the brokers that the update metadata request should be sent to + * @param umr the (rewritten) remote update metadata request itself + */ + def sendRemoteRequestToBrokers(brokerIds: Seq[Int], umr: UpdateMetadataRequest): Unit = { + try { + // note that our caller already updated umr's controllerEpoch field (as well as others), so no need for that here + val updateMetadataRequestBuilder = new UpdateMetadataRequest.WrappingBuilder(umr) + brokerIds.foreach { + broker => sendRequest(broker, updateMetadataRequestBuilder) + } + } catch { + case e: Throwable => + if (brokerIds.nonEmpty) { + // GRR FIXME: do we need any kind of detailed "current state" info from umr here (as in + // sendRequestsToBrokers() above)? + error(s"Haven't been able to forward remote metadata update requests to brokers " + + s"$brokerIds. Exception message: $e") + } + throw new IllegalStateException(e) + } + } +} // end of abstract class AbstractControllerBrokerRequestBatch case class ControllerBrokerStateInfo(networkClient: NetworkClient, brokerNode: Node, diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 3b548fd23e519..f2bbe7ee76b96 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -105,6 +105,9 @@ class ControllerContext { @volatile var livePreferredControllerIds: Set[Int] = Set.empty + // [Federation only] + val liveOrShuttingDownRemoteControllerIds = mutable.Set.empty[Int] + private def clearTopicsState(): Unit = { allTopics = Set.empty partitionAssignments.clear() @@ -204,6 +207,11 @@ class ControllerContext { livePreferredControllerIds = preferredControllerIds } + // [Federation only] + def addRemoteControllers(remoteControllerIds: Set[Int]): Unit = { + liveOrShuttingDownRemoteControllerIds ++= remoteControllerIds + } + // getter def liveBrokerIds: Set[Int] = liveBrokerEpochs.filter(b => b._2 > (shuttingDownBrokerIds.getOrElse(b._1, -1L))).keySet def liveOrShuttingDownBrokerIds: Set[Int] = liveBrokerEpochs.keySet @@ -214,6 +222,9 @@ class ControllerContext { def getLivePreferredControllerIds : Set[Int] = livePreferredControllerIds + // [Federation only] + def getLiveOrShuttingDownRemoteControllerIds : Set[Int] = liveOrShuttingDownRemoteControllerIds + def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = { partitionAssignments.flatMap { case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter { diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala index e83ee07093690..0f1a952a55073 100644 --- a/core/src/main/scala/kafka/controller/ControllerState.scala +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -124,9 +124,14 @@ object ControllerState { def value = 20 } + case object ForwardUpdateMetadataRequest extends ControllerState { + def value = 21 + } + val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, AlterPartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived, LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, TopicUncleanLeaderElectionEnable, ListPartitionReassignment, UpdateMetadataResponseReceived, - TopicDeletionFlagChange, PreferredControllerChange, TopicMinInSyncReplicasConfigChange, SkipControlledShutdownSafetyCheck) + TopicDeletionFlagChange, PreferredControllerChange, TopicMinInSyncReplicasConfigChange, SkipControlledShutdownSafetyCheck, + ForwardUpdateMetadataRequest) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 27e108687ba26..c8ca5155d6b3d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -20,21 +20,21 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge import kafka.admin.{AdminOperationException, AdminUtils} import kafka.api._ +import kafka.cluster.Broker import kafka.common._ -import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback} +import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ForwardUpdateMetadataCallback, ListReassignmentsCallback} import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server._ import kafka.utils._ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zk._ import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler} -import org.apache.kafka.common.ElectionType -import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{ElectionType, KafkaException, Node, TopicPartition} import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, NotEnoughReplicasException, PolicyViolationException, StaleBrokerEpochException} +import org.apache.kafka.common.message.UpdateMetadataResponseData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateMetadataResponse} +import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateMetadataRequest, UpdateMetadataResponse} import org.apache.kafka.common.utils.Time import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code @@ -57,6 +57,7 @@ object KafkaController extends Logging { type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit + type ForwardUpdateMetadataCallback = UpdateMetadataResponse => Unit def satisfiesLiCreateTopicPolicy(createTopicPolicy : Option[CreateTopicPolicy], zkClient : KafkaZkClient, topic : String, partitionsAssignment : collection.Map[Int, ReplicaAssignment]): Boolean = { @@ -96,6 +97,7 @@ class KafkaController(val config: KafkaConfig, initialBrokerInfo: BrokerInfo, initialBrokerEpoch: Long, tokenManager: DelegationTokenManager, + clusterId: String, threadNamePrefix: Option[String] = None) extends ControllerEventProcessor with Logging with KafkaMetricsGroup { @@ -118,12 +120,12 @@ class KafkaController(val config: KafkaConfig, private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time, controllerContext.stats.rateAndTimeMetrics) - private val brokerRequestBatch = new ControllerBrokerRequestBatch(config, controllerChannelManager, + private val brokerRequestBatch = new ControllerBrokerRequestBatch(config, clusterId, controllerChannelManager, eventManager, controllerContext, stateChangeLogger) val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient, - new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)) + new ControllerBrokerRequestBatch(config, clusterId, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)) val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient, - new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)) + new ControllerBrokerRequestBatch(config, clusterId, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)) val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine, partitionStateMachine, new ControllerDeletionClient(this, zkClient)) @@ -305,11 +307,40 @@ class KafkaController(val config: KafkaConfig, eventManager.put(skipControlledShutdownEvent) } + + /** + * [Federation only] Get the Node struct (basic connection details) for the specified broker ID. + * This is provided to a remote controller so it can send its cluster updates to us. + */ + // [might be a test-only method; will depend on how "real" configuration/discovery/recovery works out] + def getBrokerNode(brokerId: Int): Option[Node] = controllerChannelManager.getBrokerNode(brokerId) + + + /** + * [Federation only] Add the specified broker as a remote controller, i.e., a target for local + * metadata updates but not for rewritten remote ones. Loosely speaking, this is the other side + * of getBrokerNode(), i.e., this is what the other side does when it receives getBrokerNode() + * info from another controller. Note that there is no inverse: once added, a remote controller + * stays added until this broker bounces. [Might change once the full remote-ZK watcher + * implementation exists?] + */ + // [currently a test-only method, but likely to be used with "real" configuration/discovery/recovery as well] + //GRR TEMP: replace this with a "just in time" or at least "frequently refreshed" variant once we + // have the remote-ZK watcher variant (analogous to processIsrChangeNotification() far, far below) + def addRemoteController(broker: Broker): Unit = { + controllerChannelManager.addRemoteController(broker) + controllerContext.addRemoteControllers(Set(broker.id)) + } + private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = { this.brokerInfo = newBrokerInfo zkClient.updateBrokerInfo(newBrokerInfo) } + private[kafka] def forwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback): Unit = { + eventManager.put(ForwardUpdateMetadataRequest(umr, callback)) + } + private[kafka] def enableDefaultUncleanLeaderElection(): Unit = { eventManager.put(UncleanLeaderElectionEnable) } @@ -1167,6 +1198,7 @@ class KafkaController(val config: KafkaConfig, * Send the leader information for selected partitions to selected brokers so that they can correctly respond to * metadata requests * + * @param partitions The topic-partitions whose metadata should be sent * @param brokers The brokers that the update metadata request should be sent to */ private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = { @@ -1180,6 +1212,24 @@ class KafkaController(val config: KafkaConfig, } } + /** + * [Federation only] Send the topic-partition metadata from a remote physical cluster to all of our brokers so + * they can correctly respond to metadata requests for the entire federation. + * + * @param brokers The brokers that the update metadata request should be sent to + * @param umr The (rewritten) remote update metadata request itself + */ + private[controller] def sendRemoteUpdateMetadataRequest(brokers: Seq[Int], umr: UpdateMetadataRequest): Unit = { + try { + brokerRequestBatch.newBatch() // [GRR: this is a do-nothing (other than throwing exceptions) sanity checker] + brokerRequestBatch.sendRemoteRequestToBrokers(brokers, umr) + } catch { + case e: IllegalStateException => + info(s"sendRemoteUpdateMetadataRequest(): caught exception while sanity-checking for new batch or forwarding remote request to local brokers", e) + handleIllegalState(e) + } + } + /** * Does not change leader or isr, but just increments the leader epoch * @@ -1329,6 +1379,26 @@ class KafkaController(val config: KafkaConfig, controllerContext.skipShutdownSafetyCheck += (id -> brokerEpoch) } + def processForwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback): Unit = { + if (!isActive) { + throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown") + } + + info(s"controller for clusterId=${clusterId} has received a remote, non-rewritten UpdateMetadataRequest " + + s"(clusterId=${umr.originClusterId}, routingClusterId=${umr.routingClusterId}): about to validate and rewrite it") + + // Inside KafkaApis, we've already validated that + // 1. the originClusterId is not equal to my local cluster Id + // 2. the routingClusterId is null + umr.rewriteRemoteRequest(clusterId, config.brokerId, + controllerContext.epoch, controllerContext.maxBrokerEpoch) + val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq + sendRemoteUpdateMetadataRequest(liveBrokers, umr) + + // For now, we always return a successful UpdateMetadataResponse + callback(new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code))) + } + private def safeToShutdown(id: Int, brokerEpoch: Long): Boolean = { // First, check whether or not the broker requesting shutdown has already been told that it is OK to shut down // at this epoch. @@ -2254,6 +2324,8 @@ class KafkaController(val config: KafkaConfig, processStartup() case SkipControlledShutdownSafetyCheck(id, brokerEpoch, callback) => processSkipControlledShutdownSafetyCheck(id, brokerEpoch, callback) + case ForwardUpdateMetadataRequest(umr, callback) => + processForwardUpdateMetadataRequest(umr, callback) } } catch { case e: ControllerMovedException => @@ -2460,6 +2532,10 @@ case class SkipControlledShutdownSafetyCheck(id: Int, brokerEpoch: Long, skipCon def state: ControllerState.SkipControlledShutdownSafetyCheck.type = ControllerState.SkipControlledShutdownSafetyCheck } +case class ForwardUpdateMetadataRequest(umr: UpdateMetadataRequest, callback: ForwardUpdateMetadataCallback) extends ControllerEvent { + def state = ControllerState.ForwardUpdateMetadataRequest +} + case class LeaderAndIsrResponseReceived(leaderAndIsrResponse: LeaderAndIsrResponse, brokerId: Int) extends ControllerEvent { def state = ControllerState.LeaderAndIsrResponseReceived } diff --git a/core/src/main/scala/kafka/server/FederatedMetadataCache.scala b/core/src/main/scala/kafka/server/FederatedMetadataCache.scala new file mode 100644 index 0000000000000..3922b24cb8434 --- /dev/null +++ b/core/src/main/scala/kafka/server/FederatedMetadataCache.scala @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.Broker +import kafka.utils.CoreUtils._ +import kafka.utils.Logging +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.requests.UpdateMetadataRequest +import org.apache.kafka.common.{Node, TopicPartition} + +import scala.collection.{Seq, mutable} + + +/** + * A cache for the state (e.g., current leader) of each partition and of the federated meta-cluster + * (i.e., all brokers in all of the federation's physical clusters). This cache is updated through + * UpdateMetadataRequests sent (or forwarded) by the controller. Every broker maintains the same + * cache, asynchronously. + */ +class FederatedMetadataCache(brokerId: Int, localClusterId: String = "defaultClusterId") extends MetadataCache(brokerId) with Logging { + + this.metadataSnapshot = MultiClusterMetadataSnapshot(partitionStatesMap = mutable.AnyRefMap.empty, + controllerIdOpt = None, multiClusterAliveBrokers = mutable.Map.empty, multiClusterAliveNodes = mutable.Map.empty) + + this.logIdent = s"[FederatedMetadataCache brokerId=$brokerId] " + + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest. + // The key invariant is that the new snapshot cannot affect the old one, i.e., an _unchanged_ + // set of partition states can be reused, but if there are updates, they must go into a + // completely new map within the new snapshot. Similarly, federated multi-clusters can + // reuse the (unchanging) aliveBrokers and aliveNodes sub-maps corresponding to all of the + // physical clusters NOT specified in the update request, but their parent multimaps cannot + // be reused since one of the physical clusters (the one in the request) always has broker/node + // changes. + // [TODO (LIKAFKA-42886): add brokerId range-checking to detect when one physical cluster's range + // overlaps another's, or compare counts of flattened maps to sum of counts of individual ones: + // if mismatch, potentially do set intersections to find duplicate(s)] + override def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = { + inWriteLock(partitionMetadataLock) { + + info(s"GRR DEBUG: entering federated updateMetadata() (correlationId=${correlationId}, UpdateMetadataRequest clusterId=${updateMetadataRequest.originClusterId}, local clusterId=${localClusterId})") + + // federation case: multi-cluster "envelopes" for all clusters' broker/node-maps (keyed by clusterId) + val multiClusterAliveBrokers = new mutable.HashMap[String, mutable.LongMap[Broker]] // (metadataSnapshot.numClusters + 1) <-- TODO (LIKAFKA-42886)? + val multiClusterAliveNodes = new mutable.HashMap[String, mutable.LongMap[collection.Map[ListenerName, Node]]] // (metadataSnapshot.numClusters + 1) + + val mcMetadataSnapshot: MultiClusterMetadataSnapshot = metadataSnapshot.asInstanceOf[MultiClusterMetadataSnapshot] + + // populate each new envelope-map with all clusters' brokers/nodes, except for the cluster in the UMR: + mcMetadataSnapshot.multiClusterAliveBrokers.keys.foreach { clusterId => + if (!clusterId.equals(updateMetadataRequest.originClusterId)) { + info(s"updateMetadata(): copying existing clusterId=${clusterId} brokers to new snapshot since UpdateMetadataRequest is updating clusterId=${updateMetadataRequest.originClusterId} brokers") + multiClusterAliveBrokers(clusterId) = mcMetadataSnapshot.multiClusterAliveBrokers(clusterId) + } else { + info(s"updateMetadata(): NOT copying existing clusterId=${clusterId} brokers to new snapshot since UpdateMetadataRequest is replacing those") + } + } + mcMetadataSnapshot.multiClusterAliveNodes.keys.foreach { clusterId => + if (!clusterId.equals(updateMetadataRequest.originClusterId)) { + info(s"updateMetadata(): copying existing clusterId=${clusterId} nodes to new snapshot since UpdateMetadataRequest is updating clusterId=${updateMetadataRequest.originClusterId} nodes") + multiClusterAliveNodes(clusterId) = mcMetadataSnapshot.multiClusterAliveNodes(clusterId) + } else { + info(s"updateMetadata(): NOT copying existing clusterId=${clusterId} nodes to new snapshot since UpdateMetadataRequest is replacing those") + } + } + + // replacement broker- and node-maps for the UpdateMetadataRequest's single physical cluster, which + // replaces our current copy: + val umrClusterId = if (updateMetadataRequest.originClusterId != null) updateMetadataRequest.originClusterId else localClusterId + val numBrokersInUpdatingCluster = if (mcMetadataSnapshot.multiClusterAliveBrokers.contains(umrClusterId)) + mcMetadataSnapshot.multiClusterAliveBrokers(umrClusterId).size else 0 + val numNodesInUpdatingCluster = if (mcMetadataSnapshot.multiClusterAliveNodes.contains(umrClusterId)) + mcMetadataSnapshot.multiClusterAliveNodes(umrClusterId).size else 0 + val umrAliveBrokers = new mutable.LongMap[Broker](numBrokersInUpdatingCluster) + val umrAliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](numNodesInUpdatingCluster) + + // unconditional replacement of snapshot's broker/node-maps (for a single physical cluster) with those + // specified in updateMetadataRequest (there's no such thing as delta-updates for a cluster's nodes) + generateSingleClusterBrokersAndNodesMaps(updateMetadataRequest, umrAliveBrokers, umrAliveNodes) + + multiClusterAliveBrokers(umrClusterId) = umrAliveBrokers + multiClusterAliveNodes(umrClusterId) = umrAliveNodes + + val controllerId = updateMetadataRequest.controllerId match { + case id if id < 0 => None + case id => Some(id) + } + val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] + val possiblyUpdatedPartitionStates = maybeUpdatePartitionStates(updateMetadataRequest, deletedPartitions, correlationId) + + metadataSnapshot = MultiClusterMetadataSnapshot(possiblyUpdatedPartitionStates, controllerId, multiClusterAliveBrokers, multiClusterAliveNodes) + + deletedPartitions + } + } + + + // TODO: add brokerId ranges (track in updateMetadata()) as sanity check: ensure no overlap between physical clusters + case class MultiClusterMetadataSnapshot( + partitionStatesMap: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + controllerIdOpt: Option[Int], + multiClusterAliveBrokers: mutable.Map[String, mutable.LongMap[Broker]], + multiClusterAliveNodes: mutable.Map[String, mutable.LongMap[collection.Map[ListenerName, Node]]]) + extends MetadataSnapshot { + + // GRR VERIFY: intention is that these things get called exactly once per construction (regardless of getter + // calls), since "val" is like "final" and can be set only in ctor... + val aliveBrokersMap: mutable.LongMap[Broker] = { + val flattenedBrokersMap: mutable.LongMap[Broker] = new mutable.LongMap[Broker] // FIXME? could add loop to count total size, or could track it dynamically within snapshots... + multiClusterAliveBrokers.values.foreach { brokerMap => + flattenedBrokersMap ++= brokerMap + } + flattenedBrokersMap + } + + val aliveNodesMap: mutable.LongMap[collection.Map[ListenerName, Node]] = { + val flattenedNodesMap: mutable.LongMap[collection.Map[ListenerName, Node]] = + new mutable.LongMap[collection.Map[ListenerName, Node]] // FIXME? could add loop to count total size, or could track it dynamically within snapshots... + multiClusterAliveNodes.values.foreach { nodesMap => + flattenedNodesMap ++= nodesMap + } + flattenedNodesMap + } + + def partitionStates() = partitionStatesMap + def controllerId() = controllerIdOpt + def aliveBrokers() = aliveBrokersMap + def aliveNodes() = aliveNodesMap + } + +} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fa32e432ae646..e4de88af1cc18 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -293,7 +293,16 @@ class KafkaApis(val requestChannel: RequestChannel, val updateMetadataRequest = request.body[UpdateMetadataRequest] authorizeClusterOperation(request, CLUSTER_ACTION) - if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.maxBrokerEpoch)) { + if (shouldForwardUpdateMetadataRequest(updateMetadataRequest)) { + // The metadata propagation follows an eventual consistency model, which means + // an UpdateMetadataRequest is not specific to a particular broker, or to a particular broker epoch. + // For example, if a newly restarted broker accepts an UpdateMetadataRequest intended for its previous + // epoch, there won't be any correctness violations. + // For UpdateMetadataRequests from foreign clusters, there is no need to check the brokerEpoch or maxBrokerEpoch + controller.forwardUpdateMetadataRequest(updateMetadataRequest, updateMetadataResponse => { + sendResponseExemptThrottle(request, updateMetadataResponse) + }) + } else if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.maxBrokerEpoch)) { // When the broker restarts very quickly, it is possible for this broker to receive request intended // for its previous generation so the broker should skip the stale request. info("Received update metadata request with stale broker epoch info " + @@ -307,6 +316,11 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def shouldForwardUpdateMetadataRequest(updateMetadataRequest: UpdateMetadataRequest): Boolean = { + config.liFederationEnable && !clusterId.equals(updateMetadataRequest.originClusterId()) && updateMetadataRequest.routingClusterId() == null + } + + // [unlike the other "doHandle..." methods, this one DOES use the request arg] private def doHandleUpdateMetadataRequest(request: RequestChannel.Request, correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): UpdateMetadataResponse = { val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest) if (deletedPartitions.nonEmpty) @@ -318,6 +332,12 @@ class KafkaApis(val requestChannel: RequestChannel, } } quotas.clientQuotaCallback.foreach { callback => + // It's unclear what the clusterId arg in here is intended for; getClusterMetadata() simply stuffs it into + // the returned Cluster object, and neither the Apache Kafka repo nor LinkedIn have any real implementations + // of updateClusterMetadata() outside of a single test case (CustomQuotaCallbackTest), which doesn't use + // clusterId. Best guess is that some external users might use it for logging, in which case the best + // approach would probably be to replace it with the value of li.federation.id if federation is enabled. + // TODO for pushing federation upstream, if/when that happens. (See also LIKAFKA-42885.) if (callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, request.context.listenerName))) { quotas.fetch.updateQuotaMetricConfigs() quotas.produce.updateQuotaMetricConfigs() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ed47b1ebe3f1d..7305f1c906149 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -293,6 +293,7 @@ object Defaults { /** Linkedin Internal states */ val LiCombinedControlRequestEnabled = false val LiAsyncFetcherEnabled = false + val LiFederationEnabled = false } object KafkaConfig { @@ -388,6 +389,7 @@ object KafkaConfig { val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable" val UnofficialClientCacheTtlProp = "unofficial.client.cache.ttl" val ExpectedClientSoftwareNamesProp = "expected.client.software.names" + val LiFederationEnableProp = "li.federation.enable" // GRR /************* Authorizer Configuration ***********/ val AuthorizerClassNameProp = "authorizer.class.name" @@ -692,6 +694,7 @@ object KafkaConfig { val UnofficialClientLoggingEnableDoc = "Controls whether logging occurs when an ApiVersionsRequest is received from a client unsupported by LinkedIn, such as an Apache Kafka client." val UnofficialClientCacheTtlDoc = "The amount of time (in hours) for the identity of an unofficial client to live in the local cache to avoid duplicate log messages." val ExpectedClientSoftwareNamesDoc = "The software names of clients that are supported by LinkedIn, such as Avro, Raw, and Tracking clients." + val LiFederationEnableDoc = "Specifies whether multiple physical clusters should be combined into one federated (logical) cluster." // GRR FIXME: still need a config or runtime mechanism to identify which clusters are federated and to enable them to find and talk to each other /************* Authorizer Configuration ***********/ val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" + @@ -1099,6 +1102,7 @@ object KafkaConfig { .define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc) .define(UnofficialClientCacheTtlProp, LONG, Defaults.UnofficialClientCacheTtl, LOW, UnofficialClientCacheTtlDoc) .define(ExpectedClientSoftwareNamesProp, LIST, Defaults.ExpectedClientSoftwareNames, LOW, ExpectedClientSoftwareNamesDoc) + .define(LiFederationEnableProp, BOOLEAN, Defaults.LiFederationEnabled, HIGH, LiFederationEnableDoc) // GRR /************* Authorizer Configuration ***********/ .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) @@ -1500,6 +1504,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val liAsyncFetcherEnable = getBoolean(KafkaConfig.LiAsyncFetcherEnableProp) def liCombinedControlRequestEnable = getBoolean(KafkaConfig.LiCombinedControlRequestEnableProp) + // GRR: what decides "val" vs. "var" vs. "def" for these things? overrides? (when would that make sense?) + def liFederationEnable = getBoolean(KafkaConfig.LiFederationEnableProp) def unofficialClientLoggingEnable = getBoolean(KafkaConfig.UnofficialClientLoggingEnableProp) def unofficialClientCacheTtl = getLong(KafkaConfig.UnofficialClientCacheTtlProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index aae13d43c7232..f89c66e1dcf3f 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -293,7 +293,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) logManager.startup() - metadataCache = new MetadataCache(config.brokerId) + metadataCache = if (config.liFederationEnable) new FederatedMetadataCache(config.brokerId, clusterId) + else new MetadataCache(config.brokerId) // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) @@ -330,7 +331,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP tokenManager.startup() /* start kafka controller */ - kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, threadNamePrefix) + kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, clusterId, threadNamePrefix) kafkaController.startup() adminManager = new AdminManager(config, metrics, metadataCache, zkClient, kafkaController) @@ -765,6 +766,35 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName) + /** + * [Federation only] Get the Node struct (basic connection details) for the specified broker ID. + * This is provided to a remote controller so it can send its cluster updates to us. + */ + // [might be a test-only method; will depend on how "real" configuration/discovery/recovery works out] + def getBrokerNode(brokerId: Int): Option[Node] = { + if (kafkaController != null) { + kafkaController.getBrokerNode(brokerId) + } else { + warn(s"Cannot look up broker Node info because controller is null?!?") + None + } + } + + /** + * [Federation only] Add the specified broker as a remote controller, i.e., a target for local + * metadata updates but not for rewritten remote ones. Loosely speaking, this is the other side + * of getBrokerNode(), i.e., this is what the other side does when it receives getBrokerNode() + * info from another controller. + */ + // [might be a test-only method; will depend on how "real" configuration/discovery/recovery works out] + def addRemoteController(broker: Broker): Unit = { + if (kafkaController != null) { + kafkaController.addRemoteController(broker) + } else { + warn(s"Cannot add remote controller ${broker} to null local controller!") + } + } + /** * Reads the BrokerMetadata. If the BrokerMetadata doesn't match in all the log.dirs, InconsistentBrokerMetadataException is * thrown. diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala old mode 100755 new mode 100644 index d70a4d0e57759..dc013708c9dc3 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -43,16 +43,17 @@ import scala.collection.{Seq, Set, mutable} */ class MetadataCache(brokerId: Int) extends Logging { - private val partitionMetadataLock = new ReentrantReadWriteLock() + protected val partitionMetadataLock = new ReentrantReadWriteLock() //this is the cache state. every MetadataSnapshot instance is immutable, and updates (performed under a lock) //replace the value with a completely new one. this means reads (which are not under any lock) need to grab //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. //multiple reads of this value risk getting different snapshots. - @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty, - controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty) + @volatile protected var metadataSnapshot: MetadataSnapshot = + SingleClusterMetadataSnapshot(partitionStatesMap = mutable.AnyRefMap.empty, controllerIdOpt = None, + aliveBrokersMap = mutable.LongMap.empty, aliveNodesMap = mutable.LongMap.empty) this.logIdent = s"[MetadataCache brokerId=$brokerId] " - private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) + protected val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) // This method is the main hotspot when it comes to the performance of metadata requests, // we should be careful about adding additional logic here. Relatedly, `brokers` is @@ -248,65 +249,27 @@ class MetadataCache(brokerId: Int) extends Logging { snapshot.controllerId.map(id => node(id)).orNull) } - // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest. + // The key invariant is that the new snapshot cannot affect the old one, i.e., an _unchanged_ + // set of partition states can be reused, but if there are updates, they must go into a + // completely new map within the new snapshot. def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = { inWriteLock(partitionMetadataLock) { - val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) - val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) + val singleClusterAliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) + val singleClusterAliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) + + generateSingleClusterBrokersAndNodesMaps(updateMetadataRequest, singleClusterAliveBrokers, singleClusterAliveNodes) + val controllerId = updateMetadataRequest.controllerId match { case id if id < 0 => None case id => Some(id) } + val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] + val possiblyUpdatedPartitionStates = maybeUpdatePartitionStates(updateMetadataRequest, deletedPartitions, correlationId) - updateMetadataRequest.liveBrokers.asScala.foreach { broker => - // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which - // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could - // move to `AnyRefMap`, which has comparable performance. - val nodes = new java.util.HashMap[ListenerName, Node] - val endPoints = new mutable.ArrayBuffer[EndPoint] - broker.endpoints.asScala.foreach { ep => - val listenerName = new ListenerName(ep.listener) - endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) - nodes.put(listenerName, new Node(broker.id, ep.host, ep.port)) - } - aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) - aliveNodes(broker.id) = nodes.asScala - } - aliveNodes.get(brokerId).foreach { listenerMap => - val listeners = listenerMap.keySet - if (!aliveNodes.values.forall(_.keySet == listeners)) - error(s"Listeners are not identical across brokers: $aliveNodes") - } + metadataSnapshot = SingleClusterMetadataSnapshot(possiblyUpdatedPartitionStates, controllerId, singleClusterAliveBrokers, singleClusterAliveNodes) - val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] - if (!updateMetadataRequest.partitionStates.iterator.hasNext) { - metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes) - } else { - //since kafka may do partial metadata updates, we start by copying the previous state - val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size) - metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) => - val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size) - copy ++= oldPartitionStates - partitionStates += (topic -> copy) - } - updateMetadataRequest.partitionStates.asScala.foreach { info => - val controllerId = updateMetadataRequest.controllerId - val controllerEpoch = updateMetadataRequest.controllerEpoch - val tp = new TopicPartition(info.topicName, info.partitionIndex) - if (info.leader == LeaderAndIsr.LeaderDuringDelete) { - removePartitionInfo(partitionStates, tp.topic, tp.partition) - stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + - s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") - deletedPartitions += tp - } else { - addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info) - stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " + - s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") - } - } - metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes) - } deletedPartitions } } @@ -317,6 +280,74 @@ class MetadataCache(brokerId: Int) extends Logging { def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined + protected def generateSingleClusterBrokersAndNodesMaps( + updateMetadataRequest: UpdateMetadataRequest, + aliveBrokers: mutable.LongMap[Broker], + aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]): + Unit = { + updateMetadataRequest.liveBrokers.asScala.foreach { broker => + // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which + // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could + // move to `AnyRefMap`, which has comparable performance. + val nodes = new java.util.HashMap[ListenerName, Node] + val endPoints = new mutable.ArrayBuffer[EndPoint] + broker.endpoints.asScala.foreach { ep => + val listenerName = new ListenerName(ep.listener) + endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) + nodes.put(listenerName, new Node(broker.id, ep.host, ep.port)) + } + aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) + aliveNodes(broker.id) = nodes.asScala + } + aliveNodes.get(brokerId).foreach { listenerMap => + val listeners = listenerMap.keySet + if (!aliveNodes.values.forall(_.keySet == listeners)) + error(s"Listeners are not identical across brokers: $aliveNodes") + } + } + + // Conditional replacement of snapshot's partitionStates (might be a full update, a partial update, or no update); + // called under lock. + protected def maybeUpdatePartitionStates( + updateMetadataRequest: UpdateMetadataRequest, + deletedPartitions: mutable.ArrayBuffer[TopicPartition], + correlationId: Int): + mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]] = { + if (!updateMetadataRequest.partitionStates.iterator.hasNext) { + metadataSnapshot.partitionStates + } else { + //since kafka may do partial metadata updates, we start by copying the previous state + val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size) + metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) => + val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size) + copy ++= oldPartitionStates + partitionStates += (topic -> copy) + } + val controllerId = updateMetadataRequest.controllerId + val controllerEpoch = updateMetadataRequest.controllerEpoch + val originClusterId = updateMetadataRequest.originClusterId() + updateMetadataRequest.partitionStates.asScala.foreach { info => + val tp = new TopicPartition(info.topicName, info.partitionIndex) + if (info.leader == LeaderAndIsr.LeaderDuringDelete) { + removePartitionInfo(partitionStates, tp.topic, tp.partition) + // TODO: for federation case, enhance both this log and its sibling below with "cluster ${clusterId} " + // (or with "color" if have such a human-readable alternative), but currently we don't have access to + // the local clusterId in here + stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + + s"request sent by controller $controllerId epoch $controllerEpoch of origin cluster $originClusterId " + + s"with correlation id $correlationId") + deletedPartitions += tp + } else { + addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info) + stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to UpdateMetadata " + + s"request sent by controller $controllerId epoch $controllerEpoch of origin cluster $originClusterId" + + s"with correlation id $correlationId") + } + } + partitionStates + } + } + private def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], topic: String, partitionId: Int): Boolean = { partitionStates.get(topic).exists { infos => @@ -326,9 +357,25 @@ class MetadataCache(brokerId: Int) extends Logging { } } - case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], - controllerId: Option[Int], - aliveBrokers: mutable.LongMap[Broker], - aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) + + trait MetadataSnapshot { + def partitionStates(): mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]] + def controllerId(): Option[Int] + def aliveBrokers(): mutable.LongMap[Broker] + def aliveNodes(): mutable.LongMap[collection.Map[ListenerName, Node]] + } + + + case class SingleClusterMetadataSnapshot( + partitionStatesMap: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + controllerIdOpt: Option[Int], + aliveBrokersMap: mutable.LongMap[Broker], + aliveNodesMap: mutable.LongMap[collection.Map[ListenerName, Node]]) + extends MetadataSnapshot { + def partitionStates() = partitionStatesMap + def controllerId() = controllerIdOpt + def aliveBrokers() = aliveBrokersMap + def aliveNodes() = aliveNodesMap + } } diff --git a/core/src/main/scala/kafka/utils/LiDecomposedControlRequestUtils.scala b/core/src/main/scala/kafka/utils/LiDecomposedControlRequestUtils.scala index 36a67b769f383..0044865d5a93f 100644 --- a/core/src/main/scala/kafka/utils/LiDecomposedControlRequestUtils.scala +++ b/core/src/main/scala/kafka/utils/LiDecomposedControlRequestUtils.scala @@ -24,7 +24,7 @@ import kafka.server.KafkaConfig import kafka.utils.CoreUtils.toJavaConsumer import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState -import org.apache.kafka.common.message.{LiCombinedControlRequestData, UpdateMetadataRequestData} +import org.apache.kafka.common.message.LiCombinedControlRequestData import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState} import org.apache.kafka.common.requests.{LeaderAndIsrRequest, LiCombinedControlRequest, StopReplicaRequest, UpdateMetadataRequest} import org.apache.kafka.common.utils.LiCombinedControlTransformer @@ -84,7 +84,7 @@ object LiDecomposedControlRequestUtils { else throw new IllegalStateException("The inter.broker.protocol.version config should not be smaller than 2.4-IV1") Some(new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, request.controllerId(), request.controllerEpoch(), request.brokerEpoch(), - request.maxBrokerEpoch(), effectivePartitionStates, liveBrokers).build()) + request.maxBrokerEpoch(), effectivePartitionStates, liveBrokers, "FIXME (LIKAFKA-41423) federation+LCCR not yet supported").build()) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 8fdc105926b1a..0c501df7a1712 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -317,7 +317,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .setSecurityProtocol(securityProtocol.id) .setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava)).asJava val version = ApiKeys.UPDATE_METADATA.latestVersion - new requests.UpdateMetadataRequest.Builder(version, brokerId, Int.MaxValue, Long.MaxValue, Long.MaxValue, partitionStates, brokers).build() + new requests.UpdateMetadataRequest.Builder(version, brokerId, Int.MaxValue, Long.MaxValue, Long.MaxValue, partitionStates, brokers, "fakeClusterId").build() } private def createJoinGroupRequest = { diff --git a/core/src/test/scala/integration/kafka/api/MultiClusterAbstractConsumerTest.scala b/core/src/test/scala/integration/kafka/api/MultiClusterAbstractConsumerTest.scala index f30adf387b548..78c6022e90384 100644 --- a/core/src/test/scala/integration/kafka/api/MultiClusterAbstractConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/MultiClusterAbstractConsumerTest.scala @@ -85,8 +85,14 @@ abstract class MultiClusterAbstractConsumerTest extends MultiClusterBaseRequestT } // create the test topics - createTopic(topicNameCluster0, numPartitions = 2, replicaCount, clusterIndex = 0) - createTopic(topicNameCluster1, 1, replicaCount, clusterIndex = 1) // single-partition topic in 2nd cluster for simplicity + // GRR FIXME: DISABLED for now: instead, create both topics explicitly within test case itself + // since we're called before federation interconnects are set up + // GRR TODO: figure out overridable setUp() sequence that allows these two to be created AFTER + // federation mode (if enabled) is ready (currently getting killed because super.setUp() + // in TEST is called before federation setup happens...so maybe fixing it there is + // solution? hmmm...) + //createTopic(topicNameCluster0, numPartitions = 2, replicaCount, clusterIndex = 0) + //createTopic(topicNameCluster1, 1, replicaCount, clusterIndex = 1) // single-partition topic in 2nd cluster for simplicity } protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener { diff --git a/core/src/test/scala/integration/kafka/api/MultiClusterIntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/MultiClusterIntegrationTestHarness.scala index 2ec556f8034e7..ab890f20713d9 100644 --- a/core/src/test/scala/integration/kafka/api/MultiClusterIntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/MultiClusterIntegrationTestHarness.scala @@ -123,25 +123,25 @@ abstract class MultiClusterIntegrationTestHarness extends MultiClusterKafkaServe clientSaslProperties) } - // TODO: currently cluster 0 only def createProducer[K, V](keySerializer: Serializer[K] = new ByteArraySerializer, valueSerializer: Serializer[V] = new ByteArraySerializer, - configOverrides: Properties = new Properties): KafkaProducer[K, V] = { + configOverrides: Properties = new Properties, + clusterIndex: Int = 0): KafkaProducer[K, V] = { val props = new Properties - props ++= producerConfigs(0) + props ++= producerConfigs(clusterIndex) props ++= configOverrides val producer = new KafkaProducer[K, V](props, keySerializer, valueSerializer) producers += producer producer } - // TODO: currently cluster 0 only def createConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer, valueDeserializer: Deserializer[V] = new ByteArrayDeserializer, configOverrides: Properties = new Properties, - configsToRemove: List[String] = List()): KafkaConsumer[K, V] = { + configsToRemove: List[String] = List(), + clusterIndex: Int = 0): KafkaConsumer[K, V] = { val props = new Properties - props ++= consumerConfigs(0) + props ++= consumerConfigs(clusterIndex) props ++= configOverrides configsToRemove.foreach(props.remove(_)) val consumer = new KafkaConsumer[K, V](props, keyDeserializer, valueDeserializer) @@ -149,10 +149,10 @@ abstract class MultiClusterIntegrationTestHarness extends MultiClusterKafkaServe consumer } - // TODO: currently cluster 0 only - def createAdminClient(configOverrides: Properties = new Properties): Admin = { + def createAdminClient(configOverrides: Properties = new Properties, + clusterIndex: Int = 0): Admin = { val props = new Properties - props ++= adminClientConfigs(0) + props ++= adminClientConfigs(clusterIndex) props ++= configOverrides val adminClient = AdminClient.create(props) adminClients += adminClient @@ -161,7 +161,6 @@ abstract class MultiClusterIntegrationTestHarness extends MultiClusterKafkaServe @After override def tearDown(): Unit = { - // TODO: figure out how want to store and shut down per-cluster clients producers.foreach(_.close(Duration.ZERO)) consumers.foreach(_.wakeup()) consumers.foreach(_.close(Duration.ZERO)) diff --git a/core/src/test/scala/integration/kafka/api/ProxyBasedFederationTest.scala b/core/src/test/scala/integration/kafka/api/ProxyBasedFederationTest.scala index 196c5d2ceb9ea..37972dc824d8b 100644 --- a/core/src/test/scala/integration/kafka/api/ProxyBasedFederationTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProxyBasedFederationTest.scala @@ -17,11 +17,17 @@ package kafka.api import scala.collection.JavaConverters._ +import java.util.Properties +import java.util.concurrent.TimeUnit + import org.junit.Assert._ -import org.junit.Test +import org.junit.{Before, Test} +import kafka.server.KafkaConfig import kafka.server.KafkaServer import kafka.server.QuotaType +import kafka.utils.TestUtils +import org.apache.kafka.common.{KafkaException, Node} /** * Currently a simple proof of concept of a multi-cluster integration test, but ultimately intended @@ -31,18 +37,442 @@ import kafka.server.QuotaType */ class ProxyBasedFederationTest extends MultiClusterAbstractConsumerTest { override def numClusters: Int = 2 // need one ZK instance for each Kafka cluster [TODO: can we "chroot" instead?] - override def brokerCountPerCluster: Int = 3 // three _per Kafka cluster_, i.e., six total + override def brokerCountPerCluster: Int = 5 // 5 _per Kafka cluster_ (3 normal, 2 controllers), i.e., 10 total @Test def testBasicMultiClusterSetup(): Unit = { + debug(s"GRR DEBUG: beginning testBasicMultiClusterSetup() with numClusters=${numClusters} and brokerCountPerCluster=${brokerCountPerCluster}") + + debug(s"GRR DEBUG: creating admin client for cluster 0") + val cluster0AdminClient = createAdminClient(clusterIndex = 0) + debug(s"GRR DEBUG: requesting list of topics using admin client for cluster 0 ...") + var topicsViaCluster0 = cluster0AdminClient.listTopics().names().get(10000, TimeUnit.MILLISECONDS) + debug(s"GRR DEBUG: topics list via broker in physical cluster 0 = ${topicsViaCluster0}") + + // topic ${topicNameCluster0} _should_ have been created in cluster 0 by setUp(), but if it wasn't for + // any reason (e.g., tinkering with another test case), we'll have an infinite loop here if auto-topic + // creation is disabled => check and proactively create it ourselves if necessary + if (!topicsViaCluster0.contains(topicNameCluster0)) { + debug(s"GRR DEBUG: test topic was NOT created in setUp(); creating dual-partition topic '${topicNameCluster0}' in cluster 0 now") + createTopic(topicNameCluster0, numPartitions = 2, replicaCount, clusterIndex = 0) + debug(s"GRR DEBUG: again requesting list of topics using admin client for cluster 0 ...") + topicsViaCluster0 = cluster0AdminClient.listTopics().names().get(10000, TimeUnit.MILLISECONDS) + debug(s"GRR DEBUG: topics list via broker in physical cluster 0 = ${topicsViaCluster0}") + } + val numRecords = 1000 + debug(s"GRR DEBUG: creating producer (IMPLICITLY FOR CLUSTER 0)") val producer = createProducer() + debug(s"GRR DEBUG: using producer to send $numRecords records to topic-partition $tp1c0 (IMPLICITLY FOR CLUSTER 0)") sendRecords(producer, numRecords, tp1c0) + debug(s"GRR DEBUG: creating consumer (IMPLICITLY FOR CLUSTER 0)") val consumer = createConsumer() + debug(s"GRR DEBUG: 'assigning' consumer to topic-partition $tp1c0 ... or vice-versa (IMPLICITLY FOR CLUSTER 0)") consumer.assign(List(tp1c0).asJava) + debug(s"GRR DEBUG: seeking to beginning of topic-partition $tp1c0 (IMPLICITLY FOR CLUSTER 0)") consumer.seek(tp1c0, 0) + debug(s"GRR DEBUG: calling consumeAndVerifyRecords()") consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0) + debug(s"GRR DEBUG: done with testBasicMultiClusterSetup()\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n") + } + + + + +/* + GRR FIXME: + ----------------------------------------------------------------------------------------------------------------- + x [OUTSIDE OF TEST] must define, read, and obey new federation config(s): at LEAST "federation.enable" = true + - ideally also some kind of "federated.cluster.id" or whatever that all physical clusters in federation + can share => know that they're in the same federation (or is that naive? ideally want auto-discovery + somehow, but since not sharing same ZK cluster, unclear how that would work: asked Nick what would be + ideal in his eyes) + - [longer-term] ideally also a persistent "color" per physical cluster so can log things more memorably + + x in federation test: + // want federation.enable to be universal => can/should add it to serverConfig map/hashtable + // (BUT IF AND ONLY IF WE FIX modifyConfigs() OVERRIDE DEFINITION!) [FIXED] + this.serverConfig.setProperty("federation.enable", "true") // or KafkaConfig.FederationEnableProp + // existing (relevant) configs to set in test: + // KafkaConfig.LiCombinedControlRequestEnableProp = "li.combined.control.request.enable" + // false for now; should be global => serverConfig + this.serverConfig.setProperty(KafkaConfig.LiCombinedControlRequestEnableProp, "false") + // KafkaConfig.AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback" + // false for now; should be global => serverConfig + this.serverConfig.setProperty(KafkaConfig.AllowPreferredControllerFallbackProp, "false") + x in KafkaConfig: + // also add KafkaConfig.LiFederationEnableProp == "li.federation.enable" to KafkaConfig so "fromProp()" + // works on it + x in KafkaController, channel mgr, and/or KafkaBroker: + // also add to controller and maybe broker code so routing works right: enable iff federation.enable == true + + // this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") + // this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) + // this.serverConfig.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") + // this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString) + // this.serverConfig.setProperty(s"${listenerName.configPrefix}${KafkaConfig.PrincipalBuilderClassProp}", + + ----------------------------------------------------------------------------------------------------------------- + x need way to set configs of test brokers, including brokerIds of all brokers (must be unique in federation) + and preferred controller-ness of two brokers in each cluster + >>>>>>>>>>> should be in MultiClusterKafkaServerTestHarness: servers, instanceConfigs, generateConfigs() + >>>>>>>>>>> some overrides in MultiClusterIntegrationTestHarness, too + + x YES: can just override modifyConfigs() in test: + // MultiClusterIntegrationTestHarness (ultimate superclass): + override def modifyConfigs(props: Seq[Properties], clusterIndex: Int): Unit = { + configureListeners(props) + props.foreach(_ ++= serverConfig) + } + // MultiClusterBaseRequestTest (superclass of test, subclass of MultiClusterIntegrationTestHarness): + override def modifyConfigs(props: Seq[Properties], clusterIndex: Int): Unit = { + super.modifyConfigs(props, clusterIndex) + props.foreach { p => + p.put(KafkaConfig.ControlledShutdownEnableProp, "false") + brokerPropertyOverrides(p) // multiple (non-MultiCluster) tests do override this per-broker method + } + } + + x need to add clusterIndex arg (doh!), but that's fine: SaslPlainPlaintextConsumerTest overrides + method, but it does the non-MultiCluster one; within MultiCluster-land, only MultiClusterBaseRequestTest + does so, and we can fix that: DO IT!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + + x existing (relevant) configs to set in test: + x KafkaConfig.BrokerIdProp = "broker.id" + // 100*(clusterIndex + 1) + {0 .. 4} + x KafkaConfig.PreferredControllerProp = "preferred.controller" + // true for brokerIds 100, 101, 200, 201 only + + X hack up TestUtils.bootstrapServers() to tell each of instanceServers (brokers) what its bootstrap value + is, and hack up latter to let us look up that info (if KafkaServer and/or KafkaBroker doesn't already know) + x NO, not necessary: each controller (channel mgr) already has brokerStateInfo map of brokerId -> + ControllerBrokerStateInfo, and latter has NetworkClient (which, logging shows, knows host:port), + "brokerNode" (== Broker node() result, whatever that is), and RequestSendThread: SOMEWHERE in + there must be way to get host:port back out, and if not, can always hack ControllerBrokerStateInfo + to include Broker and/or host + port directly + x with all that ^^^^, get add a new "getSelfBroker()" method or "getConnectionInfo()" or something + (also a test-hack) to KafkaServer, which test itself can use either at top of test case or bottom + of setUp() to pass to OTHER cluster's controllers, e.g.: + // In real life this would be done via configs, but since we're running MANY brokers on the same + // host, we don't know the port numbers ahead of time, i.e., they're dynamically generated at + // runtime. Ergo, we look them up after everybody has started and cross-pollinate via setters: + val serversInCluster0 = serversByCluster(0) + val serversInCluster1 = serversByCluster(1) + // inform both preferred controllers in cluster 0 of both remote controllers (i.e., in cluster 1) + serversInCluster0(0).addRemoteController(serversInCluster1(0).getConnectionInfo()) + serversInCluster0(1).addRemoteController(serversInCluster1(1).getConnectionInfo()) + // inform both preferred controllers in cluster 1 of both remote controllers (i.e., in cluster 0) + serversInCluster1(0).addRemoteController(serversInCluster0(0).getConnectionInfo()) + serversInCluster1(1).addRemoteController(serversInCluster0(1).getConnectionInfo()) + + x clone addNewBroker() or modify it to include remote controllers (with filter to prevent remote UMRs from + heading back out to remote controllers) + + ----------------------------------------------------------------------------------------------------------------- + - [ideally] need way to associate clusterIndex with (generated only?) clusterId of each cluster + + ----------------------------------------------------------------------------------------------------------------- + x need way to specify which are the preferred controllers of the _other_ physical cluster in both clusters + X for test (only), could set up controllersByCluster() method + backing array in + MultiClusterIntegrationTestHarness that enables easy lookup for us... would still need to inject + into actual broker/controller code somehow, but would avoid config dependency and possible race + (i.e., don't know port numbers of brokers until they start, and can't leave configs for after startup + => can't pre-configure remote controllers in configs) + x no, can just add some accessors and let test case (or setUp()) do so directly + + ----------------------------------------------------------------------------------------------------------------- + x need way to create at least one topic in each physical cluster (maybe 3 and 5 partitions, respectively?) + >>>>>>>>>>> should be in MultiClusterKafkaServerTestHarness, MultiClusterIntegrationTestHarness + - TopicWith3Partitions + - TopicWith5Partitions + +GRR WORKING: + ----------------------------------------------------------------------------------------------------------------- + - need way to hook up producer and/or consumer client to either physical cluster initially + + ----------------------------------------------------------------------------------------------------------------- + - need way to verify all setup + - among other things, want to enumerate all topics in each physical cluster and verify NO OVERLAP + + ----------------------------------------------------------------------------------------------------------------- + - need way to verify remote UMRs are sent/received in both directions + + ----------------------------------------------------------------------------------------------------------------- + - FIXME: MultiClusterIntegrationTestHarness creates consumer-offsets topic in each cluster, but need + to eliminate that if federation.enable == true + + +---------------------------------------+ +---------------------------------------+ + | PHYSICAL CLUSTER INDEX 0 | | PHYSICAL CLUSTER INDEX 1 | + | - broker 100 = preferred controller | | - broker 200 = preferred controller | + | - broker 101 = preferred controller | | - broker 201 = preferred controller | + | - broker 102 = data broker | | - broker 202 = data broker | + | - broker 103 = data broker/bootstrap | | - broker 203 = data broker/bootstrap | + | - broker 104 = data broker | | - broker 204 = data broker | + +---------------------------------------+ +---------------------------------------+ + +REF: + - super to subclasses: + - MultiClusterZooKeeperTestHarness + protected def numClusters: Int = 1 + [GRR changes: + (1) added Buffer[]-wrapped zkClients, adminZkClients, zookeepers (all new vals) + (2) added backward-compatible accessor methods to replace zkClient, adminZkClient, zookeeper, zkConnect vars + (3) added extended, same-name-but-one-arg accessor methods to index zkClient, adminZkClient, zookeeper, + zkConnect by cluster (i.e., to access "other dimension" added by Buffer[]-wrapping them, basically) + (4) extended setUp() to create numClusters instances of zkClient, adminZkClient, zookeeper rather than + just one + (5) extended tearDown() to loop over numClusters instances of zkClients and zookeepers rather than just one + ] + - MultiClusterKafkaServerTestHarness + Buffer[KafkaServer] servers [created in setUp() => can potentially override] + Seq[KafkaConfig] instanceConfigs + configs() that calls generateConfigs() [abstract here!] if not already generated + setUp() that creates servers + createTopic() methods x 2 + [GRR changes: + (1) converted instanceConfigs var to Buffer[]-wrapped val + (2) converted servers var to Buffer[]-wrapped val "instanceServers" + (3) converted brokerList var to Buffer[]-wrapped val "brokerLists" + (4) added backward-compatible accessor methods to replace servers and brokerList vars + (5) added final but otherwise backward-compatible accessor method to replace configs method + (6) added extended, same-name-but-one-arg accessor method to index brokerList by cluster + (7) added extended, NOT-same-name-but-one-arg accessor methods to index "servers" and "configs" by cluster + (specifically, serversByCluster() and configsByCluster(), with latter having special logic to avoid + breaking existing overrides of generateConfigs() abstract method for clusterId == 0 case) + (8) added extended, NOT-same-name-but-one-arg, NOT-quite-abstract method to generate configs for various + clusters (namely, generateConfigsByCluster(Int), which devolves to generateConfigs() for Int == 0 but + throws for non-zero values, i.e., multi-cluster implementations must override it but single-cluster + ones need not) + (9) extended setUp() to create numClusters instances of instanceServers array and brokerList string + [does NOT yet handle "alive" array correctly] + (10) extended tearDown() to loop over numClusters instances of "servers" (i.e., serversByCluster(i)) + (11) extended first createTopic() method with extra clusterIndex arg at end (defaulting to 0) + (12) extended second createTopic() method with extra clusterIndex arg at end (NO default, sigh => code changes!) + (13) [NO changes to killRandomBroker(), killBroker(), or restartDeadBrokers(): all still single-cluster] + ] + - MultiClusterIntegrationTestHarness + protected def brokerCount: Int + sole override/impl of generateConfigs() in test-harness stack (but mult tests override) + [GRR changes: + (1) added extended, NOT-same-name-but-one-arg override implementation for generateConfigsByCluster(Int) + (and switched implementation of existing no-args generateConfigs() to call it with arg == 0) + (2) extended setUp() to create numClusters instances of offsets topic + [does NOT yet extend producerConfig, consumerConfig, and adminClientConfig to extra clusters] + (3) [NO changes to createProducer(), createConsumer(), or createAdminClient(): all still single-cluster] + ] + - MultiClusterBaseRequestTest + override def brokerCount: Int = 3 + override def modifyConfigs(props: Seq[Properties]): Unit = { ... } + [provides SocketServer stuff, connect/send/receive, etc.: IMPORTANT] + - MultiClusterAbstractConsumerTest + override def brokerCount: Int = 3 + CALLS createTopic(topic, 2, brokerCount) in setUp() (sole addition to super.setUp()) + [provides client send/receive stuff, commit callback, assignments/CGM, etc.: IMPORTANT] + - ProxyBasedFederationTest + override def numClusters: Int = 2 + override def brokerCountPerCluster: Int = 5 (was 3 originally, but now want dedicated controllers, too) + */ + + + + // Set up brokers and controllers as follows: + // +---------------------------------------+ +---------------------------------------+ + // | PHYSICAL CLUSTER(-INDEX) 0 | | PHYSICAL CLUSTER(-INDEX) 1 | + // | - broker 100 = preferred controller | | - broker 200 = preferred controller | + // | - broker 101 = preferred controller | | - broker 201 = preferred controller | + // | - broker 102 = data broker | | - broker 202 = data broker | + // | - broker 103 = data broker | | - broker 203 = data broker | + // | - broker 104 = data broker | | - broker 204 = data broker | + // +---------------------------------------+ +---------------------------------------+ + // The bootstrap-servers list for each cluster will contain all five brokers. + override def modifyConfigs(props: Seq[Properties], clusterIndex: Int): Unit = { + debug(s"GRR DEBUG: beginning ProxyBasedFederationTest modifyConfigs() override for clusterIndex=${clusterIndex}") + super.modifyConfigs(props, clusterIndex) + (0 until brokerCountPerCluster).map { brokerIndex => + // 100-104, 200-204 + val brokerId = 100*(clusterIndex + 1) + brokerIndex + debug(s"GRR DEBUG: clusterIndex=${clusterIndex}, brokerIndex=${brokerIndex}: setting broker.id=${brokerId}") + props(brokerIndex).setProperty(KafkaConfig.BrokerIdProp, brokerId.toString) + if (brokerIndex < 2) { + // true for brokerIds 100, 101, 200, 201 only + debug(s"GRR DEBUG: clusterIndex=${clusterIndex}, brokerIndex=${brokerIndex}, broker.id=${brokerId}: setting preferred.controller=true") + props(brokerIndex).setProperty(KafkaConfig.PreferredControllerProp, "true") + } else { + debug(s"GRR DEBUG: clusterIndex=${clusterIndex}, brokerIndex=${brokerIndex}, broker.id=${brokerId}: leaving preferred.controller=false") + } + } + debug(s"GRR DEBUG: done with ProxyBasedFederationTest modifyConfigs() override for clusterIndex=${clusterIndex}\n\n\n\n\n") + } + + + @Before + override def setUp(): Unit = { + debug(s"GRR DEBUG: beginning setUp() override for ProxyBasedFederationTest to enable federation, disable combined control requests, etc.\n\n\n\n\n") + this.serverConfig.setProperty(KafkaConfig.LiFederationEnableProp, "true") + this.serverConfig.setProperty(KafkaConfig.LiCombinedControlRequestEnableProp, "false") + this.serverConfig.setProperty(KafkaConfig.AllowPreferredControllerFallbackProp, "false") + this.serverConfig.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false") + this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false") + super.setUp() + // ^^^^^^^^^ GRR FIXME: to deal with consumer-offsets topic, should replace that with doSetup() call + // (which exists at least in MultiClusterIntegrationTestHarness), which then invokes its own super.setUp() + // before setting up client configs and optionally creating offsets topic(s) ... oh, wait, we don't have + // a way to make it true for one cluster and false for the other, so maybe should force to false and + // then create it manually? NEEDS MORE INVESTIGATION + + debug(s"GRR DEBUG: checking out both controllers' data structures (still in setUp()!)\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n") + + // In real life this would be done via configs, but since we're running MANY brokers on the same + // host, we don't know the port numbers at config-time, i.e., they're dynamically generated at + // runtime. Ergo, we look them up after everybody has started and cross-pollinate via setters: + +//GRR FIXME: this is a mess...we need a MUCH more elegant way to figure out the two leader-controllers and tell +// them about one another (at a minimum, can we just do some if/else instead of nested match blocks? ugh..) + + val serversInCluster0 = serversByCluster(0) + var cluster0ControllerIndex = 0 + var cluster0ControllerId = 100 // 100 should be leader... + var cluster0ControllerNodeOpt = serversInCluster0(cluster0ControllerIndex).getBrokerNode(cluster0ControllerId) + var cluster0ControllerNode: Node = null + cluster0ControllerNodeOpt match { + case Some(node) => + cluster0ControllerNode = node + case None => + cluster0ControllerIndex = 1 + cluster0ControllerId = 101 // ...but if not, it better be 101 + cluster0ControllerNodeOpt = serversInCluster0(cluster0ControllerIndex).getBrokerNode(cluster0ControllerId) + cluster0ControllerNodeOpt match { + case Some(node2) => + cluster0ControllerNode = node2 + case None => + throw new KafkaException(s"neither preferred controller in cluster 0 has info about itself") + } + } + + val serversInCluster1 = serversByCluster(1) + var cluster1ControllerIndex = 0 + var cluster1ControllerId = 200 // 200 should be leader... + var cluster1ControllerNodeOpt = serversInCluster1(cluster1ControllerIndex).getBrokerNode(cluster1ControllerId) + var cluster1ControllerNode: Node = null + cluster1ControllerNodeOpt match { + case Some(node) => + cluster1ControllerNode = node + case None => + cluster1ControllerIndex = 1 + cluster1ControllerId = 201 // ...but if not, it better be 201 + cluster1ControllerNodeOpt = serversInCluster1(cluster1ControllerIndex).getBrokerNode(cluster1ControllerId) + cluster1ControllerNodeOpt match { + case Some(node2) => + cluster1ControllerNode = node2 + case None => + throw new KafkaException(s"neither preferred controller in cluster 1 has info about itself") + } + } + + info(s"GRR DEBUG: cluster 0: server(${cluster0ControllerIndex}) is the lead controller (controllerId=${cluster0ControllerId}); creating new Broker object from its own Node info (id=${cluster0ControllerNode.id}, host=${cluster0ControllerNode.host}, port=${cluster0ControllerNode.port}) and passing to cluster 1's controller") + val cluster0ControllerBroker = TestUtils.createBroker(cluster0ControllerNode.id, cluster0ControllerNode.host, cluster0ControllerNode.port) + serversInCluster1(cluster1ControllerIndex).addRemoteController(cluster0ControllerBroker) + // no need to notify cluster 1 of our other controller since it's definitely not tracking its own brokers + // and presumably won't pay attention (as a controller) to the remote ones, either + + info(s"GRR DEBUG: cluster 1: server(${cluster1ControllerIndex}) is the lead controller (controllerId=${cluster1ControllerId}); creating new Broker object from its own Node info (id=${cluster1ControllerNode.id}, host=${cluster1ControllerNode.host}, port=${cluster1ControllerNode.port}) and passing to cluster 0's controller") + val cluster1ControllerBroker = TestUtils.createBroker(cluster1ControllerNode.id, cluster1ControllerNode.host, cluster1ControllerNode.port) + serversInCluster0(cluster0ControllerIndex).addRemoteController(cluster1ControllerBroker) + // no need to notify cluster 0 of our other controller for the same reason + + + debug(s"GRR DEBUG: done with setUp() override for ProxyBasedFederationTest\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n") + } + + + + /** + * Create topics in both physical clusters, create a producer for one of them and produce 1000 records to it, + * create a consumer for both physical clusters, have both of them consume the cluster-0 topic's records, and + * verify the same. + */ + @Test + def testFederatedClients(): Unit = { + debug(s"GRR DEBUG: beginning testFederatedClients() with numClusters=${numClusters} and brokerCountPerCluster=${brokerCountPerCluster}") + + debug(s"\n\n\n\t\tGRR DEBUG: creating dual-partition topic '${topicNameCluster0}' in cluster 0: THIS SHOULD TRIGGER CROSS-CLUSTER UpdateMetadataRequest\n\n") + createTopic(topicNameCluster0, numPartitions = 2, replicaCount, clusterIndex = 0) + + debug(s"GRR DEBUG: creating admin client for cluster 0") + val cluster0AdminClient = createAdminClient(clusterIndex = 0) + debug(s"GRR DEBUG: requesting list of topics in federation using admin client for cluster 0 ...") + var topicsViaCluster0 = cluster0AdminClient.listTopics().names().get(10000, TimeUnit.MILLISECONDS) + debug(s"\n\n\n\t\tGRR DEBUG: federated topics list via broker in physical cluster 0 = ${topicsViaCluster0}\n\n") + + debug(s"GRR DEBUG: creating admin client for cluster 1") + val cluster1AdminClient = createAdminClient(clusterIndex = 1) + debug(s"GRR DEBUG: requesting list of topics in federation using admin client for cluster 1 ...") + var topicsViaCluster1 = cluster1AdminClient.listTopics().names().get(10000, TimeUnit.MILLISECONDS) + debug(s"\n\n\n\t\tGRR DEBUG: federated topics list via broker in physical cluster 1 = ${topicsViaCluster1}\n\n") + + + debug(s"\n\n\n\t\tGRR DEBUG: creating single-partition topic '${topicNameCluster1}' in cluster 1: THIS SHOULD TRIGGER CROSS-CLUSTER UpdateMetadataRequest\n\n") + createTopic(topicNameCluster1, numPartitions = 1, replicaCount, clusterIndex = 1) // single-partition topic in 2nd cluster for simplicity + + debug(s"GRR DEBUG: again requesting list of topics in federation using admin client for cluster 0 ...") + topicsViaCluster0 = cluster0AdminClient.listTopics().names().get(10000, TimeUnit.MILLISECONDS) + debug(s"\n\n\n\t\tGRR DEBUG: updated federated topics list via broker in physical cluster 0 = ${topicsViaCluster0}\n\n") + + debug(s"GRR DEBUG: again requesting list of topics in federation using admin client for cluster 1 ...") + topicsViaCluster1 = cluster1AdminClient.listTopics().names().get(10000, TimeUnit.MILLISECONDS) + debug(s"\n\n\n\t\tGRR DEBUG: updated federated topics list via broker in physical cluster 1 = ${topicsViaCluster1}\n\n") + + + val numRecs = 1000 + debug(s"GRR DEBUG: creating producer for cluster 0") + val producer = createProducer(clusterIndex = 0) + debug(s"GRR DEBUG: using producer to send $numRecs records to topic-partition $tp1c0 in cluster 0") + sendRecords(producer, numRecs, tp1c0) + + + debug(s"\n\n\n\t\tGRR DEBUG: creating consumer for cluster 0 to consume from $tp1c0\n\n") + val cluster0Consumer = createConsumer(clusterIndex = 0) + debug(s"GRR DEBUG: 'assigning' consumer to topic-partition $tp1c0 in cluster 0") + cluster0Consumer.assign(List(tp1c0).asJava) + debug(s"GRR DEBUG: seeking consumer to beginning of topic-partition $tp1c0 in cluster 0") + cluster0Consumer.seek(tp1c0, 0) + debug(s"GRR DEBUG: calling consumeAndVerifyRecords() for consumer in cluster 0") + consumeAndVerifyRecords(consumer = cluster0Consumer, tp = tp1c0, numRecords = numRecs, startingOffset = 0) + + + debug(s"\n\n\n\t\tGRR DEBUG: creating consumer for cluster 1 to consume from $tp1c0\n\n") + val cluster1Consumer = createConsumer(clusterIndex = 1) + debug(s"GRR DEBUG: 'assigning' cluster-1 consumer to topic-partition $tp1c0 in cluster 0") + cluster1Consumer.assign(List(tp1c0).asJava) + debug(s"GRR DEBUG: seeking cluster-1 consumer to beginning of topic-partition $tp1c0 in cluster 0") + cluster1Consumer.seek(tp1c0, 0) + debug(s"GRR DEBUG: calling consumeAndVerifyRecords() for consumer in cluster 1") + consumeAndVerifyRecords(consumer = cluster1Consumer, tp = tp1c0, numRecords = numRecs, startingOffset = 0) + + + debug(s"GRR DEBUG: done with functional parts; now running assertions for consumer in cluster 0") + + // this stuff is just ripped off from PlaintextConsumerTest testQuotaMetricsNotCreatedIfNoQuotasConfigured() + def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String): Unit = { + val metricName = broker.metrics.metricName(name, + quotaType.toString, + "", + "user", "", + "client-id", clientId) + assertNull("Metric should not have been created " + metricName, broker.metrics.metric(metricName)) + } + serversByCluster(0).foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) + serversByCluster(0).foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) + serversByCluster(0).foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) + serversByCluster(0).foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) + + serversByCluster(0).foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) + serversByCluster(0).foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) + serversByCluster(0).foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) + serversByCluster(0).foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) + + debug(s"GRR DEBUG: done with testFederatedClients()\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n") } } diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala index 7b859290bce92..6241acd47d886 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala @@ -721,8 +721,8 @@ class ControllerChannelManagerTest { private case class SentRequest(request: ControlRequest, responseCallback: AbstractResponse => Unit) - private class MockControllerBrokerRequestBatch(context: ControllerContext, config: KafkaConfig = config) - extends AbstractControllerBrokerRequestBatch(config, context, logger) { + private class MockControllerBrokerRequestBatch(context: ControllerContext, config: KafkaConfig = config, clusterId: String = "fakeClusterId") + extends AbstractControllerBrokerRequestBatch(config, clusterId, context, logger) { val sentEvents = ListBuffer.empty[ControllerEvent] val sentRequests = mutable.Map.empty[Int, ListBuffer[SentRequest]] diff --git a/core/src/test/scala/unit/kafka/controller/ControllerRequestMergerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerRequestMergerTest.scala index d9565ce0944dd..2a33ed86ae25d 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerRequestMergerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerRequestMergerTest.scala @@ -154,11 +154,11 @@ class ControllerRequestMergerTest { def testMergingDifferentUpdateMetadataPartitions(): Unit = { val partitionStates1 = getUpdateMetadataPartitionStates(topic, 0) val updateMetadataRequest1 = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, brokerEpoch, brokerEpoch, - partitionStates1.asJava, updateMetadataLiveBrokers) + partitionStates1.asJava, updateMetadataLiveBrokers, "fakeClusterId") val partitionStates2 = getUpdateMetadataPartitionStates(topic, 1) val updateMetadataRequest2 = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, brokerEpoch, brokerEpoch, - partitionStates2.asJava, updateMetadataLiveBrokers) + partitionStates2.asJava, updateMetadataLiveBrokers, "fakeClusterId") val transformedPartitionStates = (partitionStates1 ++ partitionStates2).map{partitionState => LiCombinedControlTransformer.transformUpdateMetadataPartition(partitionState) @@ -178,11 +178,11 @@ class ControllerRequestMergerTest { def testSupersedingUpdateMetadataPartitionStates(): Unit = { val partitionStates1 = getUpdateMetadataPartitionStates(topic, 0) val updateMetadataRequest1 = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, brokerEpoch, brokerEpoch, - partitionStates1.asJava, updateMetadataLiveBrokers) + partitionStates1.asJava, updateMetadataLiveBrokers, "fakeClusterId") val partitionStates2 = getUpdateMetadataPartitionStates(topic, 0) val updateMetadataRequest2 = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, brokerEpoch, brokerEpoch, - partitionStates2.asJava, updateMetadataLiveBrokers) + partitionStates2.asJava, updateMetadataLiveBrokers, "fakeClusterId") val transformedPartitionStates = partitionStates2.map{partitionState => LiCombinedControlTransformer.transformUpdateMetadataPartition(partitionState) diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala index dbeb70eb71a9c..7d92b61a6a833 100755 --- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala @@ -189,7 +189,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { val requestBuilder = new UpdateMetadataRequest.Builder( ApiKeys.UPDATE_METADATA.latestVersion, controllerId, controllerEpoch, epochInRequest, epochInRequest, - partitionStates.asJava, liveBrokers.asJava) + partitionStates.asJava, liveBrokers.asJava, "fakeClusterId") if (isEpochInRequestStale) { sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index d822ace68f3b9..c0164e7e2ea1b 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -889,7 +889,7 @@ class KafkaApisTest { .setListener(plaintextListener.value)).asJava) ) val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, - 0, 0, 0, Seq.empty[UpdateMetadataPartitionState].asJava, brokers.asJava).build() + 0, 0, 0, Seq.empty[UpdateMetadataPartitionState].asJava, brokers.asJava, "fakeClusterId").build() metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest) (plaintextListener, anotherListener) } @@ -1005,7 +1005,7 @@ class KafkaApisTest { .setListener(plaintextListener.value)).asJava) val partitionStates = (0 until numPartitions).map(createPartitionState) val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, - 0, 0, 0, partitionStates.asJava, Seq(broker).asJava).build() + 0, 0, 0, partitionStates.asJava, Seq(broker).asJava, "fakeClusterId").build() metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest) } } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 0491dabb757ea..f07101c01554e 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -107,7 +107,7 @@ class MetadataCacheTest { val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch, brokerEpoch, - partitionStates.asJava, brokers.asJava).build() + partitionStates.asJava, brokers.asJava, "fakeClusterId").build() cache.updateMetadata(15, updateMetadataRequest) for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) { @@ -252,7 +252,7 @@ class MetadataCacheTest { val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch, brokerEpoch, - partitionStates.asJava, brokers.asJava).build() + partitionStates.asJava, brokers.asJava, "fakeClusterId").build() cache.updateMetadata(15, updateMetadataRequest) val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableListeners = errorUnavailableListeners) @@ -310,7 +310,7 @@ class MetadataCacheTest { val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch, brokerEpoch, - partitionStates.asJava, brokers.asJava).build() + partitionStates.asJava, brokers.asJava, "fakeClusterId").build() cache.updateMetadata(15, updateMetadataRequest) // Validate errorUnavailableEndpoints = false @@ -384,7 +384,7 @@ class MetadataCacheTest { val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch, brokerEpoch, - partitionStates.asJava, brokers.asJava).build() + partitionStates.asJava, brokers.asJava, "fakeClusterId").build() cache.updateMetadata(15, updateMetadataRequest) // Validate errorUnavailableEndpoints = false @@ -449,7 +449,7 @@ class MetadataCacheTest { .setReplicas(replicas)) val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava).build() + brokers.asJava, "fakeClusterId").build() cache.updateMetadata(15, updateMetadataRequest) val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL)) @@ -491,7 +491,7 @@ class MetadataCacheTest { .setReplicas(replicas)) val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava).build() + brokers.asJava, "fakeClusterId").build() cache.updateMetadata(15, updateMetadataRequest) } diff --git a/core/src/test/scala/unit/kafka/server/MultiClusterBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/MultiClusterBaseRequestTest.scala index ec65ae9f88c5c..efeed20c66d6f 100644 --- a/core/src/test/scala/unit/kafka/server/MultiClusterBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MultiClusterBaseRequestTest.scala @@ -38,6 +38,33 @@ abstract class MultiClusterBaseRequestTest extends MultiClusterIntegrationTestHa // If required, override properties by mutating the passed Properties object protected def brokerPropertyOverrides(properties: Properties): Unit = {} + // FIXME: BUG in original commit (cc4fde35c9cc2818af1bcb6861ce32dee0f41677) for original class + // (BaseRequestTest) from which this one was copied: does NOT call super.modifyConfigs() => + // throwing away IntegrationTestHarness's version => neither setting up listeners nor adding + // serverConfig => rendering all tests that override serverConfig as (partly) broken: + // core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala + // core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala + // core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala + // core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala + // core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala + // core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala + // core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala + // core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala + // core/src/test/scala/integration/kafka/api/MetricsTest.scala + // core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala + // core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala + // core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala + // core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala + // core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala + // core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala + // core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala + // core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala + // core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala + // core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala + // core/src/test/scala/integration/kafka/api/UserQuotaTest.scala + // core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala + // core/src/test/scala/integration/kafka/api/ProxyBasedFederationTest.scala + // core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala override def modifyConfigs(props: Seq[Properties], clusterIndex: Int): Unit = { super.modifyConfigs(props, clusterIndex) props.foreach { p => diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index e915b765c7289..5f50bfb7a48c7 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -259,7 +259,7 @@ class RequestQuotaTest extends BaseRequestTest { .setPort(0) .setSecurityProtocol(securityProtocol.id) .setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava)).asJava - new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, Int.MaxValue, Long.MaxValue, Long.MaxValue, partitionState, brokers) + new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, Int.MaxValue, Long.MaxValue, Long.MaxValue, partitionState, brokers, "fakeClusterId") case ApiKeys.CONTROLLED_SHUTDOWN => new ControlledShutdownRequest.Builder( diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 28b592eaf7af8..93f3a2a8aa7a0 100755 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.utils.Utils // This should be named EmbeddedZooKeeper for consistency with other classes, but since this is widely used by other // projects (even though it's internal), we keep the name as it is until we have a publicly supported test library for // others to use. -class EmbeddedZookeeper() extends Logging { +class EmbeddedZookeeper(requestedPort: Int = TestUtils.RandomPort) extends Logging { val snapshotDir = TestUtils.tempDir() val logDir = TestUtils.tempDir() @@ -43,7 +43,7 @@ class EmbeddedZookeeper() extends Logging { System.setProperty("zookeeper.forceSync", "no") //disable fsync to ZK txn log in tests to avoid timeout val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime) val factory = new NIOServerCnxnFactory() - private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort) + private val addr = new InetSocketAddress("127.0.0.1", requestedPort) factory.configure(addr, 0) factory.startup(zookeeper) val port = zookeeper.getClientPort diff --git a/core/src/test/scala/unit/kafka/zk/MultiClusterZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/MultiClusterZooKeeperTestHarness.scala index 7be97f5f17b7f..9e145f8636ca5 100755 --- a/core/src/test/scala/unit/kafka/zk/MultiClusterZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/MultiClusterZooKeeperTestHarness.scala @@ -69,7 +69,7 @@ abstract class MultiClusterZooKeeperTestHarness extends Logging { def setUp(): Unit = { (0 until numClusters).map { i => debug(s"creating zookeeper/zkClient/adminZkClient " + (i+1) + " of " + numClusters) - zookeepers += new EmbeddedZookeeper() + zookeepers += new EmbeddedZookeeper(10000 + 100*(i + 1)) // 10100, 10200, ... zkClients += KafkaZkClient(zkConnect(i), zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) adminZkClients += new AdminZkClient(zkClients(i)) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/MetadataCacheBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/MetadataCacheBenchmark.java index 84cff85fe905e..e719030ba186c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/MetadataCacheBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/MetadataCacheBenchmark.java @@ -63,7 +63,7 @@ public static class BenchState { public void setUp() { UpdateMetadataRequest request = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 2, 1, 0, 0, - getPartitionStates(), getUpdateMetadataBroker()).build(); + getPartitionStates(), getUpdateMetadataBroker(), "dummyClusterId").build(); metadataCache.updateMetadata(15, request); }