From ca6b3ce7d581bb9369ab93d7d2a98353645f74e3 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Tue, 16 Mar 2021 15:17:52 -0700 Subject: [PATCH 1/4] [SPARK-34828][YARN] Make shuffle service name configurable on client side and allow for classpath-based config override on server side --- .../network/yarn/YarnShuffleService.java | 54 +++++++++-- .../yarn/YarnShuffleServiceMetrics.java | 6 +- .../spark/internal/config/package.scala | 10 ++ docs/running-on-yarn.md | 94 ++++++++++++++++--- .../spark/deploy/yarn/ExecutorRunnable.scala | 4 +- .../yarn/YarnShuffleIntegrationSuite.scala | 54 +++++++++++ .../yarn/YarnShuffleServiceSuite.scala | 10 ++ .../spark/network/yarn/YarnTestAccessor.scala | 3 + 8 files changed, 210 insertions(+), 25 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 548a5ccd1385..4d50af9a318c 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.ByteBuffer; import java.util.List; @@ -75,6 +76,15 @@ * is because an application running on the same Yarn cluster may choose to not use the external * shuffle service, in which case its setting of `spark.authenticate` should be independent of * the service's. + * + * The shuffle service will produce metrics via the YARN NodeManager's {@code metrics2} system + * under a namespace specified by the {@value SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY} config. + * + * By default, all configurations for the shuffle service will be taken directly from the + * Hadoop {@link Configuration} passed by the YARN NodeManager. It is also possible to configure + * the shuffle service by placing a resource named + * {@value SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME} into the classpath, which should be an + * XML file in the standard Hadoop Configuration resource format. */ public class YarnShuffleService extends AuxiliaryService { private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); @@ -83,6 +93,14 @@ public class YarnShuffleService extends AuxiliaryService { private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; + /** + * The namespace to use for the metrics record which will contain all metrics produced by the + * shuffle service. + */ + static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY = + "spark.yarn.shuffle.service.metrics.namespace"; + private static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = "sparkShuffleService"; + // Whether the shuffle server should authenticate fetch requests private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; @@ -103,6 +121,13 @@ public class YarnShuffleService extends AuxiliaryService { private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider .StoreVersion(1, 0); + /** + * The name of the resource to search for on the classpath to find a shuffle service-specific + * configuration overlay. If found, this will be parsed as a standard Hadoop + * {@link Configuration config} file and will override the configs passed from the NodeManager. + */ + static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME = "spark-shuffle-site.xml"; + // just for integration tests that want to look at this file -- in general not sensible as // a static @VisibleForTesting @@ -157,10 +182,18 @@ private boolean isAuthenticationEnabled() { * Start the shuffle server with the given configuration. */ @Override - protected void serviceInit(Configuration conf) throws Exception { - _conf = conf; + protected void serviceInit(Configuration externalConf) throws Exception { + _conf = new Configuration(externalConf); + URL confOverlayUrl = Thread.currentThread().getContextClassLoader() + .getResource(SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME); + if (confOverlayUrl != null) { + logger.info("Initializing Spark YARN shuffle service with configuration overlay from {}", + confOverlayUrl); + _conf.addResource(confOverlayUrl); + } + super.serviceInit(_conf); - boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); try { // In case this NM was killed while there were running spark applications, we need to restore @@ -172,7 +205,7 @@ protected void serviceInit(Configuration conf) throws Exception { registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); } - TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); + TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf)); MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance( transportConf); blockHandler = new ExternalBlockHandler( @@ -181,7 +214,7 @@ protected void serviceInit(Configuration conf) throws Exception { // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); - boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + boolean authEnabled = _conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); if (authEnabled) { secretManager = new ShuffleSecretManager(); if (_recoveryPath != null) { @@ -190,7 +223,7 @@ protected void serviceInit(Configuration conf) throws Exception { bootstraps.add(new AuthServerBootstrap(transportConf, secretManager)); } - int port = conf.getInt( + int port = _conf.getInt( SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); transportContext = new TransportContext(transportConf, blockHandler, true); shuffleServer = transportContext.createServer(port, bootstraps); @@ -203,13 +236,16 @@ protected void serviceInit(Configuration conf) throws Exception { blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections", shuffleServer.getRegisteredConnections()); blockHandler.getAllMetrics().getMetrics().putAll(shuffleServer.getAllMetrics().getMetrics()); + String metricsNamespace = _conf.get(SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY, + DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME); YarnShuffleServiceMetrics serviceMetrics = - new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + new YarnShuffleServiceMetrics(metricsNamespace, blockHandler.getAllMetrics()); MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); metricsSystem.register( - "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); - logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + metricsNamespace, "Metrics on the Spark Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem using namespace '{}'", + metricsNamespace); logger.info("Started YARN shuffle service for Spark on port {}. " + "Authentication is {}. Registered executor file is {}", port, authEnabledString, diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 81be6e8036ff..f30abbd0f7fc 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -32,9 +32,11 @@ */ class YarnShuffleServiceMetrics implements MetricsSource { + private final String metricsNamespace; private final MetricSet metricSet; - YarnShuffleServiceMetrics(MetricSet metricSet) { + YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet) { + this.metricsNamespace = metricsNamespace; this.metricSet = metricSet; } @@ -46,7 +48,7 @@ class YarnShuffleServiceMetrics implements MetricsSource { */ @Override public void getMetrics(MetricsCollector collector, boolean all) { - MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService"); + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord(metricsNamespace); for (Map.Entry entry : metricSet.getMetrics().entrySet()) { collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d988e522c3df..1a18856e4156 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -680,6 +680,16 @@ package object config { private[spark] val SHUFFLE_SERVICE_PORT = ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337) + private[spark] val SHUFFLE_SERVICE_NAME = + ConfigBuilder("spark.shuffle.service.name") + .doc("The configured name of the Spark shuffle service the client should communicate with. " + + "This must match the name used to configure the Shuffle within the YARN NodeManager " + + "configuration (`yarn.nodemanager.aux-services`). Only takes effect when " + + s"$SHUFFLE_SERVICE_ENABLED is set to true.") + .version("3.2.0") + .stringConf + .createWithDefault("spark_shuffle") + private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab") .doc("Location of user's keytab.") .version("3.0.0") diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 72df64b3efc0..615fde77da05 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -8,9 +8,9 @@ license: | The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -163,7 +163,7 @@ To use a custom metrics.properties for the application master and executors, upd Please note that this feature can be used only with YARN 3.0+ For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

- Example: + Example: To request GPU resources from YARN, use: spark.yarn.am.resource.yarn.io/gpu.amount 3.0.0 @@ -185,10 +185,10 @@ To use a custom metrics.properties for the application master and executors, upd Please note that this feature can be used only with YARN 3.0+ For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

- Example: + Example: To request GPU resources from YARN, use: spark.yarn.driver.resource.yarn.io/gpu.amount - 3.0.0 + 3.0.0 spark.yarn.executor.resource.{resource-type}.amount @@ -198,7 +198,7 @@ To use a custom metrics.properties for the application master and executors, upd Please note that this feature can be used only with YARN 3.0+ For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

- Example: + Example: To request GPU resources from YARN, use: spark.yarn.executor.resource.yarn.io/gpu.amount 3.0.0 @@ -219,7 +219,7 @@ To use a custom metrics.properties for the application master and executors, upd Only used in cluster mode. Time for the YARN Application Master to wait for the SparkContext to be initialized. - 1.3.0 + 1.3.0 spark.yarn.submit.file.replication @@ -235,7 +235,7 @@ To use a custom metrics.properties for the application master and executors, upd Staging directory used while submitting applications. - 2.0.0 + 2.0.0 spark.yarn.preserve.staging.files @@ -243,7 +243,7 @@ To use a custom metrics.properties for the application master and executors, upd Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them. - 1.1.0 + 1.1.0 spark.yarn.scheduler.heartbeat.interval-ms @@ -409,12 +409,12 @@ To use a custom metrics.properties for the application master and executors, upd spark.yarn.populateHadoopClasspath - For with-hadoop Spark distribution, this is set to false; + For with-hadoop Spark distribution, this is set to false; for no-hadoop distribution, this is set to true. Whether to populate Hadoop classpath from yarn.application.classpath and - mapreduce.application.classpath Note that if this is set to false, + mapreduce.application.classpath Note that if this is set to false, it requires a with-Hadoop Spark distribution that bundles Hadoop runtime or user has to provide a Hadoop installation separately. @@ -427,7 +427,7 @@ To use a custom metrics.properties for the application master and executors, upd The maximum number of attempts that will be made to submit the application. It should be no larger than the global number of max attempts in the YARN configuration. - 1.3.0 + 1.3.0 spark.yarn.am.attemptFailuresValidityInterval @@ -572,7 +572,7 @@ To use a custom metrics.properties for the application master and executors, upd spark.yarn.metrics.namespace (none) - The root namespace for AM metrics reporting. + The root namespace for AM metrics reporting. If it is not set then the YARN application ID is used. 2.4.0 @@ -773,8 +773,27 @@ The following extra configuration options are available when the shuffle service NodeManagers where the Spark Shuffle Service is not running. + + spark.yarn.shuffle.service.metrics.namespace + sparkShuffleService + + The namespace to use when emitting shuffle service metrics into Hadoop metrics2 system of the + NodeManager. + + +Please note that the instructions above assume that the default shuffle service name, +`spark_shuffle`, has been used. It is possible to use any name here, but the values used in the +YARN NodeManager configurations must match the value of `spark.shuffle.service.name` in the +Spark application. + +The shuffle service will, by default, take all of its configurations from the Hadoop Configuration +used by the NodeManager (e.g. `yarn-site.xml`). However, it is also possible to configure the +shuffle service independently using a file named `spark-shuffle-site.xml` which should be placed +onto the classpath of the NodeManager. The shuffle service will treat this as a standard Hadoop +Configuration resource and overlay it on top of the NodeManager's configuration. + # Launching your application with Apache Oozie Apache Oozie can launch Spark applications as part of a workflow. @@ -823,3 +842,52 @@ do the following: to the list of filters in the spark.ui.filters configuration. Be aware that the history server information may not be up-to-date with the application's state. + +# Running multiple versions of the Spark Shuffle Service + +In some cases it may be desirable to run multiple instances of the Spark Shuffle Service which are +using different versions of Spark. This can be helpful, for example, when running a YARN cluster +with a mixed workload of applications running multiple Spark versions, since a given version of +the shuffle service is not always compatible with other versions of Spark. YARN versions since 2.9.0 +support the ability to run shuffle services within an isolated classloader +(see [YARN-4577](https://issues.apache.org/jira/browse/YARN-4577)), meaning multiple Spark versions +can coexist within a single NodeManager. The +`yarn.nodemanager.aux-services..classpath` and, starting from YARN 2.10.2/3.1.1/3.2.0, +`yarn.nodemanager.aux-services..remote-classpath` options can be used to configure +this. In addition to setting up separate classpaths, it's necessary to ensure the two versions +advertise to different ports. This can be achieved using the `spark-shuffle-site.xml` file described +above. For example, you may have configuration like: + +```properties + yarn.nodemanager.aux-services = spark_shuffle_x,spark_shuffle_y + yarn.nodemanager.aux-services.spark_shuffle_x.classpath = /path/to/spark-x-yarn-shuffle.jar,/path/to/spark-x-config + yarn.nodemanager.aux-services.spark_shuffle_y.classpath = /path/to/spark-y-yarn-shuffle.jar,/path/to/spark-y-config +``` + +The two `spark-*-config` directories each contain one file, `spark-shuffle-site.xml`. These are XML +files in the [Hadoop Configuration format](https://hadoop.apache.org/docs/r3.2.0/api/org/apache/hadoop/conf/Configuration.html) +which each contain a few configurations to adjust the port number and metrics name prefix used: +```xml + + + spark.shuffle.service.port + 7001 + + + spark.yarn.shuffle.service.metrics.namespace + sparkShuffleServiceX + + +``` +The values should both be different for the two different services. + +Then, in the configuration of the Spark applications, one should be configured with: +```properties + spark.shuffle.service.name = spark_shuffle_x + spark.shuffle.service.port = 7001 +``` +and one should be configured with: +```properties + spark.shuffle.service.name = spark_shuffle_y + spark.shuffle.service.port = +``` \ No newline at end of file diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ede39063cf1b..717ce57b902c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -115,7 +115,9 @@ private[yarn] class ExecutorRunnable( // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } - ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + logInfo(s"Initializing service data for shuffle service using name '$serviceName'") + ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) } // Send the start request to the ContainerManager diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 547bfca2891f..be9ab9125974 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.File +import java.net.URLClassLoader import java.nio.charset.StandardCharsets import com.google.common.io.Files @@ -109,6 +110,59 @@ class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { } +/** + * SPARK-34828: Integration test for the external shuffle service with an alternate name and + * configs (by using a configuration overlay) + */ +@ExtendedYarnTest +class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite { + + private[this] val shuffleServiceName = "custom_shuffle_service_name" + + override def newYarnConfig(): YarnConfiguration = { + val yarnConfig = super.newYarnConfig() + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, shuffleServiceName) + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format(shuffleServiceName), + classOf[YarnShuffleService].getCanonicalName) + val overlayConf = new YarnConfiguration() + // Enable authentication in the base NodeManager conf but not in the client. This would break + // shuffle, unless the shuffle service conf overlay overrides to turn off authentication. + overlayConf.setBoolean(NETWORK_AUTH_ENABLED.key, true) + // Add the authentication conf to a separate config object used as an overlay rather than + // setting it directly. This is necessary because a config overlay will override previous + // config overlays, but not configs which were set directly on the config object. + yarnConfig.addResource(overlayConf) + yarnConfig + } + + override protected def extraSparkConf(): Map[String, String] = + super.extraSparkConf() ++ Map(SHUFFLE_SERVICE_NAME.key -> shuffleServiceName) + + override def beforeAll(): Unit = { + val configFileContent = + s""" + | + | + | + | ${NETWORK_AUTH_ENABLED.key} + | false + | + | + |""".stripMargin + val jarFile = TestUtils.createJarWithFiles(Map( + YarnTestAccessor.getShuffleServiceConfOverlayResourceName -> configFileContent + )) + // Configure a custom classloader which includes the conf overlay as a resource + val oldClassLoader = Thread.currentThread().getContextClassLoader + Thread.currentThread().setContextClassLoader(new URLClassLoader(Array(jarFile))) + try { + super.beforeAll() + } finally { + Thread.currentThread().setContextClassLoader(oldClassLoader) + } + } +} + private object YarnExternalShuffleDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 188a48509212..98820680ae16 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -413,6 +413,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd )) } + test("SPARK-34828: metrics should be registered with configured name") { + s1 = new YarnShuffleService + yarnConfig.set(YarnShuffleService.SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY, "fooMetrics") + s1.init(yarnConfig) + + val metricsSystem = DefaultMetricsSystem.instance.asInstanceOf[MetricsSystemImpl] + assert(metricsSystem.getSource("sparkShuffleService") === null) + assert(metricsSystem.getSource("fooMetrics").isInstanceOf[YarnShuffleServiceMetrics]) + } + test("create default merged shuffle file manager instance") { val mockConf = mock(classOf[TransportConf]) when(mockConf.mergedShuffleFileManagerImpl).thenReturn( diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala index db322cd18e15..d87cc2638472 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala @@ -34,4 +34,7 @@ object YarnTestAccessor { service.registeredExecutorFile } + def getShuffleServiceConfOverlayResourceName: String = { + YarnShuffleService.SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME + } } From e5d7a2d4b02a5d7280ad41ba8c7b4b3ea5bdf038 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Mon, 22 Mar 2021 15:46:43 -0700 Subject: [PATCH 2/4] Revert whitespace changes in running-on-yarn.md --- docs/running-on-yarn.md | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 615fde77da05..ff701b2fbc95 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -8,9 +8,9 @@ license: | The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -163,7 +163,7 @@ To use a custom metrics.properties for the application master and executors, upd Please note that this feature can be used only with YARN 3.0+ For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

- Example: + Example: To request GPU resources from YARN, use: spark.yarn.am.resource.yarn.io/gpu.amount 3.0.0 @@ -185,10 +185,10 @@ To use a custom metrics.properties for the application master and executors, upd Please note that this feature can be used only with YARN 3.0+ For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

- Example: + Example: To request GPU resources from YARN, use: spark.yarn.driver.resource.yarn.io/gpu.amount - 3.0.0 + 3.0.0 spark.yarn.executor.resource.{resource-type}.amount @@ -198,7 +198,7 @@ To use a custom metrics.properties for the application master and executors, upd Please note that this feature can be used only with YARN 3.0+ For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

- Example: + Example: To request GPU resources from YARN, use: spark.yarn.executor.resource.yarn.io/gpu.amount 3.0.0 @@ -219,7 +219,7 @@ To use a custom metrics.properties for the application master and executors, upd Only used in cluster mode. Time for the YARN Application Master to wait for the SparkContext to be initialized. - 1.3.0 + 1.3.0 spark.yarn.submit.file.replication @@ -235,7 +235,7 @@ To use a custom metrics.properties for the application master and executors, upd Staging directory used while submitting applications. - 2.0.0 + 2.0.0 spark.yarn.preserve.staging.files @@ -243,7 +243,7 @@ To use a custom metrics.properties for the application master and executors, upd Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them. - 1.1.0 + 1.1.0 spark.yarn.scheduler.heartbeat.interval-ms @@ -409,12 +409,12 @@ To use a custom metrics.properties for the application master and executors, upd spark.yarn.populateHadoopClasspath - For with-hadoop Spark distribution, this is set to false; + For with-hadoop Spark distribution, this is set to false; for no-hadoop distribution, this is set to true. Whether to populate Hadoop classpath from yarn.application.classpath and - mapreduce.application.classpath Note that if this is set to false, + mapreduce.application.classpath Note that if this is set to false, it requires a with-Hadoop Spark distribution that bundles Hadoop runtime or user has to provide a Hadoop installation separately. @@ -427,7 +427,7 @@ To use a custom metrics.properties for the application master and executors, upd The maximum number of attempts that will be made to submit the application. It should be no larger than the global number of max attempts in the YARN configuration. - 1.3.0 + 1.3.0 spark.yarn.am.attemptFailuresValidityInterval @@ -572,7 +572,7 @@ To use a custom metrics.properties for the application master and executors, upd spark.yarn.metrics.namespace (none) - The root namespace for AM metrics reporting. + The root namespace for AM metrics reporting. If it is not set then the YARN application ID is used. 2.4.0 @@ -890,4 +890,4 @@ and one should be configured with: ```properties spark.shuffle.service.name = spark_shuffle_y spark.shuffle.service.port = -``` \ No newline at end of file +``` From 90e743ad7a0d7c1944d9af39a43e466f04d574f9 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Tue, 23 Mar 2021 09:21:36 -0700 Subject: [PATCH 3/4] Minor documentation and comment updates. Break YarnShuffleAlternateNameConfigSuite into a separate class. Fix new test in YarnShuffleServiceSuite when run with other tests in the suite. --- .../network/yarn/YarnShuffleService.java | 7 ++ docs/running-on-yarn.md | 7 +- .../YarnShuffleAlternateNameConfigSuite.scala | 79 +++++++++++++++++++ .../yarn/YarnShuffleIntegrationSuite.scala | 54 ------------- .../yarn/YarnShuffleServiceSuite.scala | 9 ++- 5 files changed, 96 insertions(+), 60 deletions(-) create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 4d50af9a318c..82b891570119 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -164,6 +164,13 @@ public class YarnShuffleService extends AuxiliaryService { private DB db; public YarnShuffleService() { + // The name of the auxiliary service configured within the NodeManager + // (`yarn.nodemanager.aux-services`) is treated as the source-of-truth, so this one can be + // arbitrary. The NodeManager will log a warning if the configured name doesn't match this name, + // to inform operators of a potential misconfiguration, but this name is otherwise not used. + // It is hard-coded instead of using the value of the `spark.shuffle.service.name` configuration + // because at this point in instantiation there is no Configuration object; it is not passed + // until `serviceInit` is called, at which point it's too late to adjust the name. super("spark_shuffle"); logger.info("Initializing YARN shuffle service for Spark"); instance = this; diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index ff701b2fbc95..a7d381df26ec 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -791,8 +791,9 @@ Spark application. The shuffle service will, by default, take all of its configurations from the Hadoop Configuration used by the NodeManager (e.g. `yarn-site.xml`). However, it is also possible to configure the shuffle service independently using a file named `spark-shuffle-site.xml` which should be placed -onto the classpath of the NodeManager. The shuffle service will treat this as a standard Hadoop -Configuration resource and overlay it on top of the NodeManager's configuration. +onto the classpath of the shuffle service (which is, by default, shared with the classpath of the +NodeManager). The shuffle service will treat this as a standard Hadoop Configuration resource and +overlay it on top of the NodeManager's configuration. # Launching your application with Apache Oozie @@ -865,7 +866,7 @@ above. For example, you may have configuration like: ``` The two `spark-*-config` directories each contain one file, `spark-shuffle-site.xml`. These are XML -files in the [Hadoop Configuration format](https://hadoop.apache.org/docs/r3.2.0/api/org/apache/hadoop/conf/Configuration.html) +files in the [Hadoop Configuration format](https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/conf/Configuration.html) which each contain a few configurations to adjust the port number and metrics name prefix used: ```xml diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala new file mode 100644 index 000000000000..db001a946fdd --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.URLClassLoader + +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark._ +import org.apache.spark.internal.config._ +import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} +import org.apache.spark.tags.ExtendedYarnTest + +/** + * SPARK-34828: Integration test for the external shuffle service with an alternate name and + * configs (by using a configuration overlay) + */ +@ExtendedYarnTest +class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite { + + private[this] val shuffleServiceName = "custom_shuffle_service_name" + + override def newYarnConfig(): YarnConfiguration = { + val yarnConfig = super.newYarnConfig() + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, shuffleServiceName) + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format(shuffleServiceName), + classOf[YarnShuffleService].getCanonicalName) + val overlayConf = new YarnConfiguration() + // Enable authentication in the base NodeManager conf but not in the client. This would break + // shuffle, unless the shuffle service conf overlay overrides to turn off authentication. + overlayConf.setBoolean(NETWORK_AUTH_ENABLED.key, true) + // Add the authentication conf to a separate config object used as an overlay rather than + // setting it directly. This is necessary because a config overlay will override previous + // config overlays, but not configs which were set directly on the config object. + yarnConfig.addResource(overlayConf) + yarnConfig + } + + override protected def extraSparkConf(): Map[String, String] = + super.extraSparkConf() ++ Map(SHUFFLE_SERVICE_NAME.key -> shuffleServiceName) + + override def beforeAll(): Unit = { + val configFileContent = + s""" + | + | + | ${NETWORK_AUTH_ENABLED.key} + | false + | + | + |""".stripMargin + val jarFile = TestUtils.createJarWithFiles(Map( + YarnTestAccessor.getShuffleServiceConfOverlayResourceName -> configFileContent + )) + // Configure a custom classloader which includes the conf overlay as a resource + val oldClassLoader = Thread.currentThread().getContextClassLoader + Thread.currentThread().setContextClassLoader(new URLClassLoader(Array(jarFile))) + try { + super.beforeAll() + } finally { + Thread.currentThread().setContextClassLoader(oldClassLoader) + } + } +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index be9ab9125974..547bfca2891f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.yarn import java.io.File -import java.net.URLClassLoader import java.nio.charset.StandardCharsets import com.google.common.io.Files @@ -110,59 +109,6 @@ class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { } -/** - * SPARK-34828: Integration test for the external shuffle service with an alternate name and - * configs (by using a configuration overlay) - */ -@ExtendedYarnTest -class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite { - - private[this] val shuffleServiceName = "custom_shuffle_service_name" - - override def newYarnConfig(): YarnConfiguration = { - val yarnConfig = super.newYarnConfig() - yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, shuffleServiceName) - yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format(shuffleServiceName), - classOf[YarnShuffleService].getCanonicalName) - val overlayConf = new YarnConfiguration() - // Enable authentication in the base NodeManager conf but not in the client. This would break - // shuffle, unless the shuffle service conf overlay overrides to turn off authentication. - overlayConf.setBoolean(NETWORK_AUTH_ENABLED.key, true) - // Add the authentication conf to a separate config object used as an overlay rather than - // setting it directly. This is necessary because a config overlay will override previous - // config overlays, but not configs which were set directly on the config object. - yarnConfig.addResource(overlayConf) - yarnConfig - } - - override protected def extraSparkConf(): Map[String, String] = - super.extraSparkConf() ++ Map(SHUFFLE_SERVICE_NAME.key -> shuffleServiceName) - - override def beforeAll(): Unit = { - val configFileContent = - s""" - | - | - | - | ${NETWORK_AUTH_ENABLED.key} - | false - | - | - |""".stripMargin - val jarFile = TestUtils.createJarWithFiles(Map( - YarnTestAccessor.getShuffleServiceConfOverlayResourceName -> configFileContent - )) - // Configure a custom classloader which includes the conf overlay as a resource - val oldClassLoader = Thread.currentThread().getContextClassLoader - Thread.currentThread().setContextClassLoader(new URLClassLoader(Array(jarFile))) - try { - super.beforeAll() - } finally { - Thread.currentThread().setContextClassLoader(oldClassLoader) - } - } -} - private object YarnExternalShuffleDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 98820680ae16..d6d1715223e3 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -56,6 +56,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd override def beforeEach(): Unit = { super.beforeEach() + // Ensure that each test uses a fresh metrics system + DefaultMetricsSystem.shutdown() + DefaultMetricsSystem.setInstance(new MetricsSystemImpl()) yarnConfig = new YarnConfiguration() yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), @@ -418,9 +421,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd yarnConfig.set(YarnShuffleService.SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY, "fooMetrics") s1.init(yarnConfig) - val metricsSystem = DefaultMetricsSystem.instance.asInstanceOf[MetricsSystemImpl] - assert(metricsSystem.getSource("sparkShuffleService") === null) - assert(metricsSystem.getSource("fooMetrics").isInstanceOf[YarnShuffleServiceMetrics]) + assert(DefaultMetricsSystem.instance.getSource("sparkShuffleService") === null) + assert(DefaultMetricsSystem.instance.getSource("fooMetrics") + .isInstanceOf[YarnShuffleServiceMetrics]) } test("create default merged shuffle file manager instance") { From ef880526e05b6143c31d98829d22d25cea659401 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Wed, 24 Mar 2021 10:14:19 -0700 Subject: [PATCH 4/4] Update documentation to make it more clear which features only work on YARN >= 2.9.0 --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 7 ++++++- docs/running-on-yarn.md | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 82b891570119..cb6d5d0ca203 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -84,7 +84,12 @@ * Hadoop {@link Configuration} passed by the YARN NodeManager. It is also possible to configure * the shuffle service by placing a resource named * {@value SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME} into the classpath, which should be an - * XML file in the standard Hadoop Configuration resource format. + * XML file in the standard Hadoop Configuration resource format. Note that when the shuffle + * service is loaded in the default manner, without configuring + * {@code yarn.nodemanager.aux-services..classpath}, this file must be on the classpath + * of the NodeManager itself. When using the {@code classpath} configuration, it can be present + * either on the NodeManager's classpath, or specified in the classpath configuration. + * This {@code classpath} configuration is only supported on YARN versions >= 2.9.0. */ public class YarnShuffleService extends AuxiliaryService { private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index a7d381df26ec..73bb76af6566 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -846,6 +846,8 @@ Be aware that the history server information may not be up-to-date with the appl # Running multiple versions of the Spark Shuffle Service +Please note that this section only applies when running on YARN versions >= 2.9.0. + In some cases it may be desirable to run multiple instances of the Spark Shuffle Service which are using different versions of Spark. This can be helpful, for example, when running a YARN cluster with a mixed workload of applications running multiple Spark versions, since a given version of