Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ object RpcUtils {

/** Returns the default Spark timeout to use for RPC ask operations. */
def askTimeout(conf: SparkConf): FiniteDuration = {
conf.getTimeAsSeconds("spark.rpc.askTimeout", "30s") seconds
conf.getTimeAsSeconds("spark.rpc.askTimeout",
conf.get("spark.network.timeout", "120s")) seconds
}

/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupTimeout(conf: SparkConf): FiniteDuration = {
conf.getTimeAsSeconds("spark.rpc.lookupTimeout", "30s") seconds
conf.getTimeAsSeconds("spark.rpc.lookupTimeout",
conf.get("spark.network.timeout", "120s")) seconds
}
}
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
test("akka deprecated configs") {
val conf = new SparkConf()

assert(!conf.contains("spark.rpc.num.retries"))
assert(!conf.contains("spark.rpc.numRetries"))
assert(!conf.contains("spark.rpc.retry.wait"))
assert(!conf.contains("spark.rpc.askTimeout"))
assert(!conf.contains("spark.rpc.lookupTimeout"))
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {

val conf = new SparkConf()
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.num.retries", "1")
conf.set("spark.rpc.numRetries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
Expand Down
34 changes: 32 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,9 @@ Apart from these, the following properties are also available, and may be useful
<td>
Default timeout for all network interactions. This config will be used in place of
<code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
<code>spark.storage.blockManagerSlaveTimeoutMs</code> or
<code>spark.shuffle.io.connectionTimeout</code>, if they are not configured.
<code>spark.storage.blockManagerSlaveTimeoutMs</code>,
<code>spark.shuffle.io.connectionTimeout</code>, <code>spark.rpc.askTimeout</code> or
<code>spark.rpc.lookupTimeout</code> if they are not configured.
</td>
</tr>
<tr>
Expand All @@ -982,6 +983,35 @@ Apart from these, the following properties are also available, and may be useful
This is only relevant for the Spark shell.
</td>
</tr>
<tr>
<td><code>spark.rpc.numRetries</code></td>
<td>3</td>
Number of times to retry before an RPC task gives up.
An RPC task will run at most times of this number.
<td>
</td>
</tr>
<tr>
<td><code>spark.rpc.retry.wait</code></td>
<td>3s</td>
<td>
Duration for an RPC ask operation to wait before retrying.
</td>
</tr>
<tr>
<td><code>spark.rpc.askTimeout</code></td>
<td>120s</td>
<td>
Duration for an RPC ask operation to wait before timing out.
</td>
</tr>
<tr>
<td><code>spark.rpc.lookupTimeout</code></td>
<td>120s</td>
Duration for an RPC remote endpoint lookup operation to wait before timing out.
<td>
</td>
</tr>
</table>

#### Scheduling
Expand Down