Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -75,6 +76,20 @@
* is because an application running on the same Yarn cluster may choose to not use the external
* shuffle service, in which case its setting of `spark.authenticate` should be independent of
* the service's.
*
* The shuffle service will produce metrics via the YARN NodeManager's {@code metrics2} system
* under a namespace specified by the {@value SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY} config.
*
* By default, all configurations for the shuffle service will be taken directly from the
* Hadoop {@link Configuration} passed by the YARN NodeManager. It is also possible to configure
* the shuffle service by placing a resource named
* {@value SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME} into the classpath, which should be an
* XML file in the standard Hadoop Configuration resource format. Note that when the shuffle
* service is loaded in the default manner, without configuring
* {@code yarn.nodemanager.aux-services.<service>.classpath}, this file must be on the classpath
* of the NodeManager itself. When using the {@code classpath} configuration, it can be present
* either on the NodeManager's classpath, or specified in the classpath configuration.
* This {@code classpath} configuration is only supported on YARN versions >= 2.9.0.
*/
public class YarnShuffleService extends AuxiliaryService {
private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
Expand All @@ -83,6 +98,14 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;

/**
* The namespace to use for the metrics record which will contain all metrics produced by the
* shuffle service.
*/
static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY =
"spark.yarn.shuffle.service.metrics.namespace";
private static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = "sparkShuffleService";

// Whether the shuffle server should authenticate fetch requests
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
Expand All @@ -103,6 +126,13 @@ public class YarnShuffleService extends AuxiliaryService {
private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
.StoreVersion(1, 0);

/**
* The name of the resource to search for on the classpath to find a shuffle service-specific
* configuration overlay. If found, this will be parsed as a standard Hadoop
* {@link Configuration config} file and will override the configs passed from the NodeManager.
*/
static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME = "spark-shuffle-site.xml";

// just for integration tests that want to look at this file -- in general not sensible as
// a static
@VisibleForTesting
Expand Down Expand Up @@ -139,6 +169,13 @@ public class YarnShuffleService extends AuxiliaryService {
private DB db;

public YarnShuffleService() {
// The name of the auxiliary service configured within the NodeManager
// (`yarn.nodemanager.aux-services`) is treated as the source-of-truth, so this one can be
// arbitrary. The NodeManager will log a warning if the configured name doesn't match this name,
// to inform operators of a potential misconfiguration, but this name is otherwise not used.
// It is hard-coded instead of using the value of the `spark.shuffle.service.name` configuration
// because at this point in instantiation there is no Configuration object; it is not passed
// until `serviceInit` is called, at which point it's too late to adjust the name.
super("spark_shuffle");
logger.info("Initializing YARN shuffle service for Spark");
instance = this;
Expand All @@ -157,10 +194,18 @@ private boolean isAuthenticationEnabled() {
* Start the shuffle server with the given configuration.
*/
@Override
protected void serviceInit(Configuration conf) throws Exception {
_conf = conf;
protected void serviceInit(Configuration externalConf) throws Exception {
_conf = new Configuration(externalConf);
URL confOverlayUrl = Thread.currentThread().getContextClassLoader()
.getResource(SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME);
if (confOverlayUrl != null) {
logger.info("Initializing Spark YARN shuffle service with configuration overlay from {}",
confOverlayUrl);
_conf.addResource(confOverlayUrl);
}
super.serviceInit(_conf);

boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);

try {
// In case this NM was killed while there were running spark applications, we need to restore
Expand All @@ -172,7 +217,7 @@ protected void serviceInit(Configuration conf) throws Exception {
registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
}

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf));
MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance(
transportConf);
blockHandler = new ExternalBlockHandler(
Expand All @@ -181,7 +226,7 @@ protected void serviceInit(Configuration conf) throws Exception {
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
boolean authEnabled = _conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
if (authEnabled) {
secretManager = new ShuffleSecretManager();
if (_recoveryPath != null) {
Expand All @@ -190,7 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception {
bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
}

int port = conf.getInt(
int port = _conf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
transportContext = new TransportContext(transportConf, blockHandler, true);
shuffleServer = transportContext.createServer(port, bootstraps);
Expand All @@ -203,13 +248,16 @@ protected void serviceInit(Configuration conf) throws Exception {
blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
shuffleServer.getRegisteredConnections());
blockHandler.getAllMetrics().getMetrics().putAll(shuffleServer.getAllMetrics().getMetrics());
String metricsNamespace = _conf.get(SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY,
DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
YarnShuffleServiceMetrics serviceMetrics =
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
new YarnShuffleServiceMetrics(metricsNamespace, blockHandler.getAllMetrics());

MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
metricsSystem.register(
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
metricsNamespace, "Metrics on the Spark Shuffle Service", serviceMetrics);
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem using namespace '{}'",
metricsNamespace);

logger.info("Started YARN shuffle service for Spark on port {}. " +
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
*/
class YarnShuffleServiceMetrics implements MetricsSource {

private final String metricsNamespace;
private final MetricSet metricSet;

YarnShuffleServiceMetrics(MetricSet metricSet) {
YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet) {
this.metricsNamespace = metricsNamespace;
this.metricSet = metricSet;
}

Expand All @@ -46,7 +48,7 @@ class YarnShuffleServiceMetrics implements MetricsSource {
*/
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService");
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord(metricsNamespace);

for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,16 @@ package object config {
private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337)

private[spark] val SHUFFLE_SERVICE_NAME =
ConfigBuilder("spark.shuffle.service.name")
.doc("The configured name of the Spark shuffle service the client should communicate with. " +
"This must match the name used to configure the Shuffle within the YARN NodeManager " +
"configuration (`yarn.nodemanager.aux-services`). Only takes effect when " +
s"$SHUFFLE_SERVICE_ENABLED is set to true.")
.version("3.2.0")
.stringConf
.createWithDefault("spark_shuffle")

private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab")
.doc("Location of user's keytab.")
.version("3.0.0")
Expand Down
71 changes: 71 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,28 @@ The following extra configuration options are available when the shuffle service
NodeManagers where the Spark Shuffle Service is not running.
</td>
</tr>
<tr>
<td><code>spark.yarn.shuffle.service.metrics.namespace</code></td>
<td><code>sparkShuffleService</code></td>
<td>
The namespace to use when emitting shuffle service metrics into Hadoop metrics2 system of the
NodeManager.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some description about the limitation with old Hadoop versions (like 2.7.x)? Here or at Section Running multiple versions of the Spark Shuffle Service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this section, everything will work as expected on Hadoop 2.7.x. The "Running multiple versions" section won't work on 2.7, but I already called out the supported YARN versions there. Can you let me know if there's anything else you think I should call out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worrying about the situation some users try to use Apache Spark distribution (with Hadoop 2.7) at YARN 2.9+ cluster. Does it work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the name referenced by the node manager works with the Hadoop 2.9+ custom class loader, but I assume with Hadoop 2.7 it requires the spark_shuffle name ? hence the spark.shuffle.service.name won't work unless you have recompiled the code and manually changed it.
Perhaps we just need to be more explicit in the config spark.shuffle.service.name that either references the section running multiple versions of the Spark Shuffle Service or explicitly states supported in YARN 2.9+. I assume this config with metrics doesn't matter as far as Hadoop version.
Also did we explicitly test with Hadoop 2.7 and the case @dongjoon-hyun brings up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the name referenced by the node manager works with the Hadoop 2.9+ custom class loader, but I assume with Hadoop 2.7 it requires the spark_shuffle name ? hence the spark.shuffle.service.name won't work unless you have recompiled the code and manually changed it.

No, this is not correct. YARN ignores the hard-coded name on all versions of YARN. Take a look at AuxServices on the 2.7.0 branch:
https://github.com/apache/hadoop/blob/f95b390df2ca7d599f0ad82cf6e8d980469e7abb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java#L129-L136

spark.shuffle.service.name works fine on Hadoop 2.7, it is only the isolated classloader that won't work on older versions.

I'm worrying about the situation some users try to use Apache Spark distribution (with Hadoop 2.7) at YARN 2.9+ cluster. Does it work?

I don't quite understand the concern here. Does my explanation above address your question? We haven't changed any of the interfaces used to interact with YARN, there should be no binary compatibility issues or anything of that sort. I can test whichever combination of Spark Version + Hadoop Version Distribution running on top of Hadoop Version YARN you like, but I am failing to see where the concern is / what you'd like me to look for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, I'm glad it works with 2.7 as well, thanks for clarifying. Yeah the concern was if it didn't work in 2.7 so I think you answered that.

</td>
</tr>
</table>

Please note that the instructions above assume that the default shuffle service name,
`spark_shuffle`, has been used. It is possible to use any name here, but the values used in the
YARN NodeManager configurations must match the value of `spark.shuffle.service.name` in the
Spark application.

The shuffle service will, by default, take all of its configurations from the Hadoop Configuration
used by the NodeManager (e.g. `yarn-site.xml`). However, it is also possible to configure the
shuffle service independently using a file named `spark-shuffle-site.xml` which should be placed
onto the classpath of the shuffle service (which is, by default, shared with the classpath of the
NodeManager). The shuffle service will treat this as a standard Hadoop Configuration resource and
overlay it on top of the NodeManager's configuration.

# Launching your application with Apache Oozie

Apache Oozie can launch Spark applications as part of a workflow.
Expand Down Expand Up @@ -823,3 +843,54 @@ do the following:
to the list of filters in the <code>spark.ui.filters</code> configuration.

Be aware that the history server information may not be up-to-date with the application's state.

# Running multiple versions of the Spark Shuffle Service

Please note that this section only applies when running on YARN versions >= 2.9.0.

In some cases it may be desirable to run multiple instances of the Spark Shuffle Service which are
using different versions of Spark. This can be helpful, for example, when running a YARN cluster
with a mixed workload of applications running multiple Spark versions, since a given version of
the shuffle service is not always compatible with other versions of Spark. YARN versions since 2.9.0
support the ability to run shuffle services within an isolated classloader
(see [YARN-4577](https://issues.apache.org/jira/browse/YARN-4577)), meaning multiple Spark versions
can coexist within a single NodeManager. The
`yarn.nodemanager.aux-services.<service-name>.classpath` and, starting from YARN 2.10.2/3.1.1/3.2.0,
`yarn.nodemanager.aux-services.<service-name>.remote-classpath` options can be used to configure
this. In addition to setting up separate classpaths, it's necessary to ensure the two versions
advertise to different ports. This can be achieved using the `spark-shuffle-site.xml` file described
above. For example, you may have configuration like:

```properties
yarn.nodemanager.aux-services = spark_shuffle_x,spark_shuffle_y
yarn.nodemanager.aux-services.spark_shuffle_x.classpath = /path/to/spark-x-yarn-shuffle.jar,/path/to/spark-x-config
yarn.nodemanager.aux-services.spark_shuffle_y.classpath = /path/to/spark-y-yarn-shuffle.jar,/path/to/spark-y-config
```

The two `spark-*-config` directories each contain one file, `spark-shuffle-site.xml`. These are XML
files in the [Hadoop Configuration format](https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/conf/Configuration.html)
which each contain a few configurations to adjust the port number and metrics name prefix used:
```xml
<configuration>
<property>
<name>spark.shuffle.service.port</name>
<value>7001</value>
</property>
<property>
<name>spark.yarn.shuffle.service.metrics.namespace</name>
<value>sparkShuffleServiceX</value>
</property>
</configuration>
```
The values should both be different for the two different services.

Then, in the configuration of the Spark applications, one should be configured with:
```properties
spark.shuffle.service.name = spark_shuffle_x
spark.shuffle.service.port = 7001
```
and one should be configured with:
```properties
spark.shuffle.service.name = spark_shuffle_y
spark.shuffle.service.port = <other value>
```
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ private[yarn] class ExecutorRunnable(
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME)
logInfo(s"Initializing service data for shuffle service using name '$serviceName'")
ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes))
}

// Send the start request to the ContainerManager
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn

import java.net.URLClassLoader

import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
import org.apache.spark.tags.ExtendedYarnTest

/**
* SPARK-34828: Integration test for the external shuffle service with an alternate name and
* configs (by using a configuration overlay)
*/
@ExtendedYarnTest
class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite {

private[this] val shuffleServiceName = "custom_shuffle_service_name"

override def newYarnConfig(): YarnConfiguration = {
val yarnConfig = super.newYarnConfig()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, shuffleServiceName)
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format(shuffleServiceName),
classOf[YarnShuffleService].getCanonicalName)
val overlayConf = new YarnConfiguration()
// Enable authentication in the base NodeManager conf but not in the client. This would break
// shuffle, unless the shuffle service conf overlay overrides to turn off authentication.
overlayConf.setBoolean(NETWORK_AUTH_ENABLED.key, true)
// Add the authentication conf to a separate config object used as an overlay rather than
// setting it directly. This is necessary because a config overlay will override previous
// config overlays, but not configs which were set directly on the config object.
yarnConfig.addResource(overlayConf)
yarnConfig
}

override protected def extraSparkConf(): Map[String, String] =
super.extraSparkConf() ++ Map(SHUFFLE_SERVICE_NAME.key -> shuffleServiceName)

override def beforeAll(): Unit = {
val configFileContent =
s"""<?xml version="1.0" encoding="UTF-8"?>
|<configuration>
| <property>
| <name>${NETWORK_AUTH_ENABLED.key}</name>
| <value>false</value>
| </property>
|</configuration>
|""".stripMargin
val jarFile = TestUtils.createJarWithFiles(Map(
YarnTestAccessor.getShuffleServiceConfOverlayResourceName -> configFileContent
))
// Configure a custom classloader which includes the conf overlay as a resource
val oldClassLoader = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(new URLClassLoader(Array(jarFile)))
try {
super.beforeAll()
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd

override def beforeEach(): Unit = {
super.beforeEach()
// Ensure that each test uses a fresh metrics system
DefaultMetricsSystem.shutdown()
DefaultMetricsSystem.setInstance(new MetricsSystemImpl())
yarnConfig = new YarnConfiguration()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
Expand Down Expand Up @@ -413,6 +416,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
))
}

test("SPARK-34828: metrics should be registered with configured name") {
s1 = new YarnShuffleService
yarnConfig.set(YarnShuffleService.SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY, "fooMetrics")
s1.init(yarnConfig)

assert(DefaultMetricsSystem.instance.getSource("sparkShuffleService") === null)
assert(DefaultMetricsSystem.instance.getSource("fooMetrics")
.isInstanceOf[YarnShuffleServiceMetrics])
}

test("create default merged shuffle file manager instance") {
val mockConf = mock(classOf[TransportConf])
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ object YarnTestAccessor {
service.registeredExecutorFile
}

def getShuffleServiceConfOverlayResourceName: String = {
YarnShuffleService.SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME
}
}