Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ public boolean preferDirectBufs() {
return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true);
}

/** Connect timeout in secs. Default 120 secs. */
/** Connect timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000;
}

/** Number of concurrent connections between two nodes for fetching data. **/
public int numConnectionsPerPeer() {
return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 2);
return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1);
}

/** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */
Expand All @@ -67,7 +67,7 @@ public int numConnectionsPerPeer() {
public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }

/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); }
public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; }

/**
* Max number of times we will try IO exceptions (such as connection timeouts) per request.
Expand All @@ -79,7 +79,7 @@ public int numConnectionsPerPeer() {
* Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }
public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; }

/**
* Minimum size of a block that we should start using memory map rather than reading in through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void doBootstrap(TransportClient client) {
ByteBuf buf = Unpooled.buffer(msg.encodedLength());
msg.encode(buf);

byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout());
byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs());
payload = saslClient.response(response);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public RetryingBlockFetcher(
this.fetchStarter = fetchStarter;
this.listener = listener;
this.maxRetries = conf.maxIORetries();
this.retryWaitTime = conf.ioRetryWaitTime();
this.retryWaitTime = conf.ioRetryWaitTimeMs();
this.outstandingBlocksIds = Sets.newLinkedHashSet();
Collections.addAll(outstandingBlocksIds, blockIds);
this.currentListener = new RetryingBlockFetchListener();
Expand Down