diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 4ca442b629fd9..4ef6656222455 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -99,7 +99,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging */ protected def askTracker[T: ClassTag](message: Any): T = { try { - trackerEndpoint.askWithRetry[T](message) + trackerEndpoint.askSync[T](message) } catch { case e: Exception => logError("Error communicating with MapOutputTracker", e) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 869c5d7094cd4..c4657e3d5ea30 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -608,7 +608,7 @@ class SparkContext(config: SparkConf) extends Logging { Some(Utils.getThreadDump()) } else { val endpointRef = env.blockManager.master.getExecutorEndpointRef(executorId).get - Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump)) + Some(endpointRef.askSync[Array[ThreadStackTrace]](TriggerThreadDump)) } } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index a4de3d7eaf458..bf6093236d92b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -123,7 +123,7 @@ private class ClientEndpoint( Thread.sleep(5000) logInfo("... polling master for driver state") val statusResponse = - activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId)) + activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) if (statusResponse.found) { logInfo(s"State of $driverId is ${statusResponse.state.get}") // Worker node, if present diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c5f7c077fe202..816bf37e39fee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -1045,7 +1045,7 @@ private[deploy] object Master extends Logging { val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) - val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) + val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 18cff3125d6b4..946a92882141c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -34,7 +34,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") /** Executor details for a particular application */ def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") - val state = master.askWithRetry[MasterStateResponse](RequestMasterState) + val state = master.askSync[MasterStateResponse](RequestMasterState) val app = state.activeApps.find(_.id == appId) .getOrElse(state.completedApps.find(_.id == appId).orNull) if (app == null) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index ebbbbd3b715b0..7dbe32975435d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -33,7 +33,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { private val master = parent.masterEndpointRef def getMasterState: MasterStateResponse = { - master.askWithRetry[MasterStateResponse](RequestMasterState) + master.askSync[MasterStateResponse](RequestMasterState) } override def renderJson(request: HttpServletRequest): JValue = { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index c19296c7b3e00..56620064c57fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -71,7 +71,7 @@ private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef, extends KillRequestServlet { protected def handleKill(submissionId: String): KillSubmissionResponse = { - val response = masterEndpoint.askWithRetry[DeployMessages.KillDriverResponse]( + val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse]( DeployMessages.RequestKillDriver(submissionId)) val k = new KillSubmissionResponse k.serverSparkVersion = sparkVersion @@ -89,7 +89,7 @@ private[rest] class StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRe extends StatusRequestServlet { protected def handleStatus(submissionId: String): SubmissionStatusResponse = { - val response = masterEndpoint.askWithRetry[DeployMessages.DriverStatusResponse]( + val response = masterEndpoint.askSync[DeployMessages.DriverStatusResponse]( DeployMessages.RequestDriverStatus(submissionId)) val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) } val d = new SubmissionStatusResponse @@ -174,7 +174,7 @@ private[rest] class StandaloneSubmitRequestServlet( requestMessage match { case submitRequest: CreateSubmissionRequest => val driverDescription = buildDriverDescription(submitRequest) - val response = masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse]( + val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse]( DeployMessages.RequestSubmitDriver(driverDescription)) val submitResponse = new CreateSubmissionResponse submitResponse.serverSparkVersion = sparkVersion diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index 8ebcbcb6a1738..1ad973122b609 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -34,12 +34,12 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { private val workerEndpoint = parent.worker.self override def renderJson(request: HttpServletRequest): JValue = { - val workerState = workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState) + val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState) JsonProtocol.writeWorkerState(workerState) } def render(request: HttpServletRequest): Seq[Node] = { - val workerState = workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState) + val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState) val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs") val runningExecutors = workerState.executors diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4a38560d8deca..b376ecd301eab 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -199,7 +199,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { new SecurityManager(executorConf), clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) - val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig) + val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index db5d0d85ceb87..d762f11125516 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -677,7 +677,7 @@ private[spark] class Executor( val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) try { - val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( + val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index a5778876d4901..4d39f144dd198 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -92,64 +92,4 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) timeout.awaitResult(future) } - /** - * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a - * default timeout, throw a SparkException if this fails even after the default number of retries. - * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this - * method retries, the message handling in the receiver side should be idempotent. - * - * Note: this is a blocking action which may cost a lot of time, so don't call it in a message - * loop of [[RpcEndpoint]]. - * - * @param message the message to send - * @tparam T type of the reply message - * @return the reply message from the corresponding [[RpcEndpoint]] - */ - @deprecated("use 'askSync' instead.", "2.2.0") - def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout) - - /** - * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a - * specified timeout, throw a SparkException if this fails even after the specified number of - * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method - * retries, the message handling in the receiver side should be idempotent. - * - * Note: this is a blocking action which may cost a lot of time, so don't call it in a message - * loop of [[RpcEndpoint]]. - * - * @param message the message to send - * @param timeout the timeout duration - * @tparam T type of the reply message - * @return the reply message from the corresponding [[RpcEndpoint]] - */ - @deprecated("use 'askSync' instead.", "2.2.0") - def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = { - // TODO: Consider removing multiple attempts - var attempts = 0 - var lastException: Exception = null - while (attempts < maxRetries) { - attempts += 1 - try { - val future = ask[T](message, timeout) - val result = timeout.awaitResult(future) - if (result == null) { - throw new SparkException("RpcEndpoint returned null") - } - return result - } catch { - case ie: InterruptedException => throw ie - case e: Exception => - lastException = e - logWarning(s"Error sending message [message = $message] in $attempts attempts", e) - } - - if (attempts < maxRetries) { - Thread.sleep(retryWaitMs) - } - } - - throw new SparkException( - s"Error sending message [message = $message]", lastException) - } - } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6177bafc11088..3d148afeac05a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -232,7 +232,7 @@ class DAGScheduler( accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates)) - blockManagerMaster.driverEndpoint.askWithRetry[Boolean]( + blockManagerMaster.driverEndpoint.askSync[Boolean]( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 31575c0ca0d15..4a2bd98ade7a6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -359,7 +359,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp try { if (driverEndpoint != null) { logInfo("Shutting down all executors") - driverEndpoint.askWithRetry[Boolean](StopExecutors) + driverEndpoint.askSync[Boolean](StopExecutors) } } catch { case e: Exception => @@ -371,7 +371,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp stopExecutors() try { if (driverEndpoint != null) { - driverEndpoint.askWithRetry[Boolean](StopDriver) + driverEndpoint.askSync[Boolean](StopDriver) } } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 7a600068912b1..3ca690db9e79f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -60,7 +60,7 @@ class BlockManagerMaster( maxMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $blockManagerId") - val updatedId = driverEndpoint.askWithRetry[BlockManagerId]( + val updatedId = driverEndpoint.askSync[BlockManagerId]( RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId @@ -72,7 +72,7 @@ class BlockManagerMaster( storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { - val res = driverEndpoint.askWithRetry[Boolean]( + val res = driverEndpoint.askSync[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) logDebug(s"Updated info of block $blockId") res @@ -80,12 +80,12 @@ class BlockManagerMaster( /** Get locations of the blockId from the driver */ def getLocations(blockId: BlockId): Seq[BlockManagerId] = { - driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId)) + driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId)) } /** Get locations of multiple blockIds from the driver */ def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { - driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]]( + driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]]( GetLocationsMultipleBlockIds(blockIds)) } @@ -99,11 +99,11 @@ class BlockManagerMaster( /** Get ids of other nodes in the cluster from the driver */ def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { - driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId)) + driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId)) } def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { - driverEndpoint.askWithRetry[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId)) + driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId)) } /** @@ -111,12 +111,12 @@ class BlockManagerMaster( * blocks that the driver knows about. */ def removeBlock(blockId: BlockId) { - driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId)) + driverEndpoint.askSync[Boolean](RemoveBlock(blockId)) } /** Remove all blocks belonging to the given RDD. */ def removeRdd(rddId: Int, blocking: Boolean) { - val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId)) + val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId)) future.onFailure { case e: Exception => logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e) @@ -128,7 +128,7 @@ class BlockManagerMaster( /** Remove all blocks belonging to the given shuffle. */ def removeShuffle(shuffleId: Int, blocking: Boolean) { - val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) + val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) future.onFailure { case e: Exception => logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e) @@ -140,7 +140,7 @@ class BlockManagerMaster( /** Remove all blocks belonging to the given broadcast. */ def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { - val future = driverEndpoint.askWithRetry[Future[Seq[Int]]]( + val future = driverEndpoint.askSync[Future[Seq[Int]]]( RemoveBroadcast(broadcastId, removeFromMaster)) future.onFailure { case e: Exception => @@ -159,11 +159,11 @@ class BlockManagerMaster( * amount of remaining memory. */ def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { - driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) + driverEndpoint.askSync[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } def getStorageStatus: Array[StorageStatus] = { - driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus) + driverEndpoint.askSync[Array[StorageStatus]](GetStorageStatus) } /** @@ -184,7 +184,7 @@ class BlockManagerMaster( * master endpoint for a response to a prior message. */ val response = driverEndpoint. - askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) + askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) val (blockManagerIds, futures) = response.unzip implicit val sameThread = ThreadUtils.sameThread val cbf = @@ -214,7 +214,7 @@ class BlockManagerMaster( filter: BlockId => Boolean, askSlaves: Boolean): Seq[BlockId] = { val msg = GetMatchingBlockIds(filter, askSlaves) - val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg) + val future = driverEndpoint.askSync[Future[Seq[BlockId]]](msg) timeout.awaitResult(future) } @@ -223,7 +223,7 @@ class BlockManagerMaster( * since they are not reported the master. */ def hasCachedBlocks(executorId: String): Boolean = { - driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId)) + driverEndpoint.askSync[Boolean](HasCachedBlocks(executorId)) } /** Stop the driver endpoint, called only on the Spark driver node */ @@ -237,7 +237,7 @@ class BlockManagerMaster( /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */ private def tell(message: Any) { - if (!driverEndpoint.askWithRetry[Boolean](message)) { + if (!driverEndpoint.askSync[Boolean](message)) { throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.") } } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 05dad7a4b86ad..16f90bf01b426 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -499,7 +499,7 @@ class StandaloneDynamicAllocationSuite /** Get the Master state */ private def getMasterState: MasterStateResponse = { - master.self.askWithRetry[MasterStateResponse](RequestMasterState) + master.self.askSync[MasterStateResponse](RequestMasterState) } /** Get the applications that are active from Master */ @@ -562,7 +562,7 @@ class StandaloneDynamicAllocationSuite when(endpointRef.address).thenReturn(mockAddress) val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty) val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] - backend.driverEndpoint.askWithRetry[Boolean](message) + backend.driverEndpoint.askSync[Boolean](message) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index bc58fb2a362a4..936639b845789 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -171,7 +171,7 @@ class AppClientSuite /** Get the Master state */ private def getMasterState: MasterStateResponse = { - master.self.askWithRetry[MasterStateResponse](RequestMasterState) + master.self.askSync[MasterStateResponse](RequestMasterState) } /** Get the applications that are active from Master */ diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index da7253b2a56df..2127da48ece49 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -432,7 +432,7 @@ class MasterSuite extends SparkFunSuite val master = makeMaster() master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) eventually(timeout(10.seconds)) { - val masterState = master.self.askWithRetry[MasterStateResponse](RequestMasterState) + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index b4037d7a9c6e8..31d9dd3de8acc 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -118,8 +118,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } val rpcEndpointRef = env.setupEndpoint("send-ref", endpoint) - val newRpcEndpointRef = rpcEndpointRef.askWithRetry[RpcEndpointRef]("Hello") - val reply = newRpcEndpointRef.askWithRetry[String]("Echo") + val newRpcEndpointRef = rpcEndpointRef.askSync[RpcEndpointRef]("Hello") + val reply = newRpcEndpointRef.askSync[String]("Echo") assert("Echo" === reply) } @@ -132,7 +132,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { context.reply(msg) } }) - val reply = rpcEndpointRef.askWithRetry[String]("hello") + val reply = rpcEndpointRef.askSync[String]("hello") assert("hello" === reply) } @@ -150,7 +150,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-remotely") try { - val reply = rpcEndpointRef.askWithRetry[String]("hello") + val reply = rpcEndpointRef.askSync[String]("hello") assert("hello" === reply) } finally { anotherEnv.shutdown() @@ -177,14 +177,13 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout") try { - // Any exception thrown in askWithRetry is wrapped with a SparkException and set as the cause - val e = intercept[SparkException] { - rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, shortProp)) + val e = intercept[RpcTimeoutException] { + rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1 millis, shortProp)) } // The SparkException cause should be a RpcTimeoutException with message indicating the // controlling timeout property - assert(e.getCause.isInstanceOf[RpcTimeoutException]) - assert(e.getCause.getMessage.contains(shortProp)) + assert(e.isInstanceOf[RpcTimeoutException]) + assert(e.getMessage.contains(shortProp)) } finally { anotherEnv.shutdown() anotherEnv.awaitTermination() @@ -677,7 +676,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "ask-authentication") - val reply = rpcEndpointRef.askWithRetry[String]("hello") + val reply = rpcEndpointRef.askSync[String]("hello") assert("hello" === reply) } finally { localEnv.shutdown() @@ -894,7 +893,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699") // Make sure the connect is set up - assert(ref.askWithRetry[String]("hello") === "hello") + assert(ref.askSync[String]("hello") === "hello") anotherEnv.shutdown() anotherEnv.awaitTermination() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 705c355234425..64a67b4c4cbab 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -394,7 +394,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - val reregister = !master.driverEndpoint.askWithRetry[Boolean]( + val reregister = !master.driverEndpoint.askSync[Boolean]( BlockManagerHeartbeat(store.blockManagerId)) assert(reregister == true) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index a674da4066456..cdb3b68489654 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -442,7 +442,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.stop() // Any method of the backend involving sending messages to the driver endpoint should not // be called after the backend is stopped. - verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]]) + verify(driverEndpoint, never()).askSync(isA(classOf[RemoveExecutor]))(any[ClassTag[_]]) } test("mesos supports spark.executor.uri") { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index e498932e51ed5..a1c1f0ec5f8a4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -101,7 +101,7 @@ private[yarn] class YarnAllocator( * @see SPARK-12864 */ private var executorIdCounter: Int = - driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId) + driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) // Queue to store the timestamp of failed executors private val failedExecutorsTimeStamps = new Queue[Long]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala index 267d17623d5e5..d0f81887e62d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala @@ -88,21 +88,21 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { /** Verify whether the given executor has the active instance of a state store */ private[state] def verifyIfInstanceActive(storeId: StateStoreId, executorId: String): Boolean = { - rpcEndpointRef.askWithRetry[Boolean](VerifyIfInstanceActive(storeId, executorId)) + rpcEndpointRef.askSync[Boolean](VerifyIfInstanceActive(storeId, executorId)) } /** Get the location of the state store */ private[state] def getLocation(storeId: StateStoreId): Option[String] = { - rpcEndpointRef.askWithRetry[Option[String]](GetLocation(storeId)) + rpcEndpointRef.askSync[Option[String]](GetLocation(storeId)) } /** Deactivate instances related to a set of operator */ private[state] def deactivateInstances(storeRootLocation: String): Unit = { - rpcEndpointRef.askWithRetry[Boolean](DeactivateInstances(storeRootLocation)) + rpcEndpointRef.askSync[Boolean](DeactivateInstances(storeRootLocation)) } private[state] def stop(): Unit = { - rpcEndpointRef.askWithRetry[Boolean](StopCoordinator) + rpcEndpointRef.askSync[Boolean](StopCoordinator) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 722024b8a6d57..f5c8a88f42af6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -188,13 +188,13 @@ private[streaming] class ReceiverSupervisorImpl( override protected def onReceiverStart(): Boolean = { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) - trackerEndpoint.askWithRetry[Boolean](msg) + trackerEndpoint.askSync[Boolean](msg) } override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") - trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString)) + trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString)) logInfo("Stopped receiver " + streamId) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 8f55d982a904c..bd7ab0b9bf5eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -170,7 +170,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false trackerState = Stopping if (!skipReceiverLaunch) { // Send the stop signal to all the receivers - endpoint.askWithRetry[Boolean](StopAllReceivers) + endpoint.askSync[Boolean](StopAllReceivers) // Wait for the Spark job that runs the receivers to be over // That is, for the receivers to quit gracefully. @@ -183,7 +183,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } // Check if all the receivers have been deregistered or not - val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds) + val receivers = endpoint.askSync[Seq[Int]](AllReceiverIds) if (receivers.nonEmpty) { logWarning("Not all of the receivers have deregistered, " + receivers) } else { @@ -249,7 +249,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false */ def allocatedExecutors(): Map[Int, Option[String]] = synchronized { if (isTrackerStarted) { - endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { + endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { _.runningExecutor.map { _.executorId }