Skip to content

Commit d90c460

Browse files
hustfeiwangcloud-fan
authored andcommitted
[SPARK-27637][SHUFFLE] For nettyBlockTransferService, if IOException occurred while fetching data, check whether relative executor is alive before retry
## What changes were proposed in this pull request? There are several kinds of shuffle client, blockTransferService and externalShuffleClient. For the externalShuffleClient, there are relative external shuffle service, which guarantees the shuffle block data and regardless the state of executors. For the blockTransferService, it is used to fetch broadcast block, and fetch the shuffle data when external shuffle service is not enabled. When fetching data by using blockTransferService, the shuffle client would connect relative executor's blockManager, so if the relative executor is dead, it would never fetch successfully. When spark.shuffle.service.enabled is true and spark.dynamicAllocation.enabled is true, the executor will be removed while it has been idle for more than idleTimeout. If a blockTransferService create connection to relative executor successfully, but the relative executor is removed when beginning to fetch broadcast block, it would retry (see RetryingBlockFetcher), which is Ineffective. If the spark.shuffle.io.retryWait and spark.shuffle.io.maxRetries is big, such as 30s and 10 times, it would waste 5 minutes. In this PR, we check whether relative executor is alive before retry. ## How was this patch tested? Unit test. Closes #24533 from turboFei/SPARK-27637. Authored-by: hustfeiwang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 039db87 commit d90c460

File tree

6 files changed

+92
-11
lines changed

6 files changed

+92
-11
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,15 +328,15 @@ object SparkEnv extends Logging {
328328
conf.get(BLOCK_MANAGER_PORT)
329329
}
330330

331-
val blockTransferService =
332-
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
333-
blockManagerPort, numUsableCores)
334-
335331
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
336332
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
337333
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
338334
conf, isDriver)
339335

336+
val blockTransferService =
337+
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
338+
blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
339+
340340
// NB: blockManager is not valid until initialize() is called later.
341341
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
342342
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,

core/src/main/scala/org/apache/spark/SparkException.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,9 @@ private[spark] class SparkDriverExecutionException(cause: Throwable)
3737
*/
3838
private[spark] case class SparkUserAppException(exitCode: Int)
3939
extends SparkException(s"User application exited with $exitCode")
40+
41+
/**
42+
* Exception thrown when the relative executor to access is dead.
43+
*/
44+
private[spark] case class ExecutorDeadException(message: String)
45+
extends SparkException(message)

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717

1818
package org.apache.spark.network.netty
1919

20+
import java.io.IOException
2021
import java.nio.ByteBuffer
2122
import java.util.{HashMap => JHashMap, Map => JMap}
2223

2324
import scala.collection.JavaConverters._
2425
import scala.concurrent.{Future, Promise}
2526
import scala.reflect.ClassTag
27+
import scala.util.{Success, Try}
2628

2729
import com.codahale.metrics.{Metric, MetricSet}
2830

2931
import org.apache.spark.{SecurityManager, SparkConf}
32+
import org.apache.spark.ExecutorDeadException
3033
import org.apache.spark.internal.config
3134
import org.apache.spark.network._
3235
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -36,8 +39,10 @@ import org.apache.spark.network.server._
3639
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher}
3740
import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream}
3841
import org.apache.spark.network.util.JavaUtils
42+
import org.apache.spark.rpc.RpcEndpointRef
3943
import org.apache.spark.serializer.JavaSerializer
4044
import org.apache.spark.storage.{BlockId, StorageLevel}
45+
import org.apache.spark.storage.BlockManagerMessages.IsExecutorAlive
4146
import org.apache.spark.util.Utils
4247

4348
/**
@@ -49,7 +54,8 @@ private[spark] class NettyBlockTransferService(
4954
bindAddress: String,
5055
override val hostName: String,
5156
_port: Int,
52-
numCores: Int)
57+
numCores: Int,
58+
driverEndPointRef: RpcEndpointRef = null)
5359
extends BlockTransferService {
5460

5561
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
@@ -112,8 +118,20 @@ private[spark] class NettyBlockTransferService(
112118
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
113119
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
114120
val client = clientFactory.createClient(host, port)
115-
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
116-
transportConf, tempFileManager).start()
121+
try {
122+
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
123+
transportConf, tempFileManager).start()
124+
} catch {
125+
case e: IOException =>
126+
Try {
127+
driverEndPointRef.askSync[Boolean](IsExecutorAlive(execId))
128+
} match {
129+
case Success(v) if v == false =>
130+
throw new ExecutorDeadException(s"The relative remote executor(Id: $execId)," +
131+
" which maintains the block data to fetch is dead.")
132+
case _ => throw e
133+
}
134+
}
117135
}
118136
}
119137

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ class BlockManagerMasterEndpoint(
105105
case GetBlockStatus(blockId, askSlaves) =>
106106
context.reply(blockStatus(blockId, askSlaves))
107107

108+
case IsExecutorAlive(executorId) =>
109+
context.reply(blockManagerIdByExecutor.contains(executorId))
110+
108111
case GetMatchingBlockIds(filter, askSlaves) =>
109112
context.reply(getMatchingBlockIds(filter, askSlaves))
110113

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,6 @@ private[spark] object BlockManagerMessages {
123123
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
124124

125125
case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster
126+
127+
case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster
126128
}

core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,21 @@
1717

1818
package org.apache.spark.network.netty
1919

20+
import java.io.IOException
21+
22+
import scala.concurrent.{ExecutionContext, Future}
23+
import scala.reflect.ClassTag
2024
import scala.util.Random
2125

22-
import org.mockito.Mockito.mock
26+
import org.mockito.ArgumentMatchers.any
27+
import org.mockito.Mockito.{mock, times, verify, when}
2328
import org.scalatest._
2429

25-
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
30+
import org.apache.spark.{ExecutorDeadException, SecurityManager, SparkConf, SparkFunSuite}
2631
import org.apache.spark.network.BlockDataManager
32+
import org.apache.spark.network.client.{TransportClient, TransportClientFactory}
33+
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager}
34+
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
2735

2836
class NettyBlockTransferServiceSuite
2937
extends SparkFunSuite
@@ -77,6 +85,48 @@ class NettyBlockTransferServiceSuite
7785
verifyServicePort(expectedPort = service0.port + 1, actualPort = service1.port)
7886
}
7987

88+
test("SPARK-27637: test fetch block with executor dead") {
89+
implicit val exectionContext = ExecutionContext.global
90+
val port = 17634 + Random.nextInt(10000)
91+
logInfo("random port for test: " + port)
92+
93+
val driverEndpointRef = new RpcEndpointRef(new SparkConf()) {
94+
override def address: RpcAddress = null
95+
override def name: String = "test"
96+
override def send(message: Any): Unit = {}
97+
// This rpcEndPointRef always return false for unit test to touch ExecutorDeadException.
98+
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
99+
Future{false.asInstanceOf[T]}
100+
}
101+
}
102+
103+
val clientFactory = mock(classOf[TransportClientFactory])
104+
val client = mock(classOf[TransportClient])
105+
// This is used to touch an IOException during fetching block.
106+
when(client.sendRpc(any(), any())).thenAnswer(_ => {throw new IOException()})
107+
var createClientCount = 0
108+
when(clientFactory.createClient(any(), any())).thenAnswer(_ => {
109+
createClientCount += 1
110+
client
111+
})
112+
113+
val listener = mock(classOf[BlockFetchingListener])
114+
var hitExecutorDeadException = false
115+
when(listener.onBlockFetchFailure(any(), any(classOf[ExecutorDeadException])))
116+
.thenAnswer(_ => {hitExecutorDeadException = true})
117+
118+
service0 = createService(port, driverEndpointRef)
119+
val clientFactoryField = service0.getClass.getField(
120+
"org$apache$spark$network$netty$NettyBlockTransferService$$clientFactory")
121+
clientFactoryField.setAccessible(true)
122+
clientFactoryField.set(service0, clientFactory)
123+
124+
service0.fetchBlocks("localhost", port, "exec1",
125+
Array("block1"), listener, mock(classOf[DownloadFileManager]))
126+
assert(createClientCount === 1)
127+
assert(hitExecutorDeadException)
128+
}
129+
80130
private def verifyServicePort(expectedPort: Int, actualPort: Int): Unit = {
81131
actualPort should be >= expectedPort
82132
// avoid testing equality in case of simultaneous tests
@@ -85,13 +135,15 @@ class NettyBlockTransferServiceSuite
85135
actualPort should be <= (expectedPort + 100)
86136
}
87137

88-
private def createService(port: Int): NettyBlockTransferService = {
138+
private def createService(
139+
port: Int,
140+
rpcEndpointRef: RpcEndpointRef = null): NettyBlockTransferService = {
89141
val conf = new SparkConf()
90142
.set("spark.app.id", s"test-${getClass.getName}")
91143
val securityManager = new SecurityManager(conf)
92144
val blockDataManager = mock(classOf[BlockDataManager])
93145
val service = new NettyBlockTransferService(conf, securityManager, "localhost", "localhost",
94-
port, 1)
146+
port, 1, rpcEndpointRef)
95147
service.init(blockDataManager)
96148
service
97149
}

0 commit comments

Comments
 (0)