File tree Expand file tree Collapse file tree 2 files changed +15
-1
lines changed
core/src/main/scala/org/apache/spark/storage
network/yarn/src/main/java/org/apache/spark/network/yarn Expand file tree Collapse file tree 2 files changed +15
-1
lines changed Original file line number Diff line number Diff line change @@ -30,6 +30,7 @@ import akka.actor.{ActorSystem, Props}
3030import sun .nio .ch .DirectBuffer
3131
3232import org .apache .spark ._
33+ import org .apache .spark .deploy .SparkHadoopUtil
3334import org .apache .spark .executor ._
3435import org .apache .spark .io .CompressionCodec
3536import org .apache .spark .network ._
@@ -92,7 +93,19 @@ private[spark] class BlockManager(
9293
9394 private [spark]
9495 val externalShuffleServiceEnabled = conf.getBoolean(" spark.shuffle.service.enabled" , false )
95- private val externalShuffleServicePort = conf.getInt(" spark.shuffle.service.port" , 7337 )
96+
97+ // In Yarn, the shuffle service port maybe set through the Hadoop config
98+ private val shuffleServicePortKey = " spark.shuffle.service.port"
99+ private val externalShuffleServicePort = {
100+ val sparkPort = conf.getInt(shuffleServicePortKey, 7337 )
101+ if (SparkHadoopUtil .get.isYarnMode) {
102+ val hadoopConf = SparkHadoopUtil .get.newConfiguration(conf)
103+ Option (hadoopConf.get(shuffleServicePortKey)).map(_.toInt).getOrElse(sparkPort)
104+ } else {
105+ sparkPort
106+ }
107+ }
108+
96109 // Check that we're not using external shuffle service with consolidated shuffle files.
97110 if (externalShuffleServiceEnabled
98111 && conf.getBoolean(" spark.shuffle.consolidateFiles" , false )
Original file line number Diff line number Diff line change @@ -49,6 +49,7 @@ protected void serviceInit(Configuration conf) {
4949 RpcHandler rpcHandler = new ExternalShuffleBlockHandler ();
5050 TransportContext transportContext = new TransportContext (transportConf , rpcHandler );
5151 transportContext .createServer (port );
52+ logger .info ("Started Yarn shuffle service for Spark on port " + port );
5253 } catch (Exception e ) {
5354 logger .error ("Exception in starting Yarn shuffle service for Spark" , e );
5455 }
You can’t perform that action at this time.
0 commit comments