Skip to content

Commit 6508953

Browse files
author
Andrew Or
committed
[SPARK-3797] Minor addendum to Yarn shuffle service
I did not realize there was a `network.util.JavaUtils` when I wrote this code. This PR moves the `ByteBuffer` string conversion to the appropriate place. I tested the changes on a stable yarn cluster. Author: Andrew Or <[email protected]> Closes #3144 from andrewor14/yarn-shuffle-util and squashes the following commits: b6c08bf [Andrew Or] Remove unused import 94e205c [Andrew Or] Use netty Unpooled 85202a5 [Andrew Or] Use guava Charsets 057135b [Andrew Or] Reword comment adf186d [Andrew Or] Move byte buffer String conversion logic to JavaUtils (cherry picked from commit 96136f2) Signed-off-by: Andrew Or <[email protected]>
1 parent 9ea0fac commit 6508953

File tree

4 files changed

+28
-26
lines changed

4 files changed

+28
-26
lines changed

network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.network.util;
1919

20+
import java.nio.ByteBuffer;
21+
2022
import java.io.ByteArrayInputStream;
2123
import java.io.ByteArrayOutputStream;
2224
import java.io.Closeable;
@@ -25,6 +27,8 @@
2527
import java.io.ObjectOutputStream;
2628

2729
import com.google.common.io.Closeables;
30+
import com.google.common.base.Charsets;
31+
import io.netty.buffer.Unpooled;
2832
import org.slf4j.Logger;
2933
import org.slf4j.LoggerFactory;
3034

@@ -73,4 +77,20 @@ public static int nonNegativeHash(Object obj) {
7377
int hash = obj.hashCode();
7478
return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
7579
}
80+
81+
/**
82+
* Convert the given string to a byte buffer. The resulting buffer can be
83+
* converted back to the same string through {@link #bytesToString(ByteBuffer)}.
84+
*/
85+
public static ByteBuffer stringToBytes(String s) {
86+
return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer();
87+
}
88+
89+
/**
90+
* Convert the given byte buffer to a string. The resulting string can be
91+
* converted back to the same byte buffer through {@link #stringToBytes(String)}.
92+
*/
93+
public static String bytesToString(ByteBuffer b) {
94+
return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
95+
}
7696
}

network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919

2020
import java.lang.Override;
2121
import java.nio.ByteBuffer;
22-
import java.nio.charset.Charset;
2322
import java.util.concurrent.ConcurrentHashMap;
2423

2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
2726

2827
import org.apache.spark.network.sasl.SecretKeyHolder;
28+
import org.apache.spark.network.util.JavaUtils;
2929

3030
/**
3131
* A class that manages shuffle secret used by the external shuffle service.
@@ -34,30 +34,10 @@ public class ShuffleSecretManager implements SecretKeyHolder {
3434
private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class);
3535
private final ConcurrentHashMap<String, String> shuffleSecretMap;
3636

37-
private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
38-
3937
// Spark user used for authenticating SASL connections
4038
// Note that this must match the value in org.apache.spark.SecurityManager
4139
private static final String SPARK_SASL_USER = "sparkSaslUser";
4240

43-
/**
44-
* Convert the given string to a byte buffer. The resulting buffer can be converted back to
45-
* the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external
46-
* shuffle service represents shuffle secrets as bytes buffers instead of strings.
47-
*/
48-
public static ByteBuffer stringToBytes(String s) {
49-
return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET));
50-
}
51-
52-
/**
53-
* Convert the given byte buffer to a string. The resulting string can be converted back to
54-
* the same byte buffer through {@link #stringToBytes(String)}. This is used if the external
55-
* shuffle service represents shuffle secrets as bytes buffers instead of strings.
56-
*/
57-
public static String bytesToString(ByteBuffer b) {
58-
return new String(b.array(), UTF8_CHARSET);
59-
}
60-
6141
public ShuffleSecretManager() {
6242
shuffleSecretMap = new ConcurrentHashMap<String, String>();
6343
}
@@ -80,7 +60,7 @@ public void registerApp(String appId, String shuffleSecret) {
8060
* Register an application with its secret specified as a byte buffer.
8161
*/
8262
public void registerApp(String appId, ByteBuffer shuffleSecret) {
83-
registerApp(appId, bytesToString(shuffleSecret));
63+
registerApp(appId, JavaUtils.bytesToString(shuffleSecret));
8464
}
8565

8666
/**

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
3636
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
3737

3838
import org.apache.spark.{SecurityManager, SparkConf, Logging}
39-
import org.apache.spark.network.sasl.ShuffleSecretManager
39+
import org.apache.spark.network.util.JavaUtils
4040

4141
@deprecated("use yarn/stable", "1.2.0")
4242
class ExecutorRunnable(
@@ -98,7 +98,8 @@ class ExecutorRunnable(
9898
val secretString = securityMgr.getSecretKey()
9999
val secretBytes =
100100
if (secretString != null) {
101-
ShuffleSecretManager.stringToBytes(secretString)
101+
// This conversion must match how the YarnShuffleService decodes our secret
102+
JavaUtils.stringToBytes(secretString)
102103
} else {
103104
// Authentication is not enabled, so just provide dummy metadata
104105
ByteBuffer.allocate(0)

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
3636
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
3737

3838
import org.apache.spark.{SecurityManager, SparkConf, Logging}
39-
import org.apache.spark.network.sasl.ShuffleSecretManager
39+
import org.apache.spark.network.util.JavaUtils
4040

4141

4242
class ExecutorRunnable(
@@ -97,7 +97,8 @@ class ExecutorRunnable(
9797
val secretString = securityMgr.getSecretKey()
9898
val secretBytes =
9999
if (secretString != null) {
100-
ShuffleSecretManager.stringToBytes(secretString)
100+
// This conversion must match how the YarnShuffleService decodes our secret
101+
JavaUtils.stringToBytes(secretString)
101102
} else {
102103
// Authentication is not enabled, so just provide dummy metadata
103104
ByteBuffer.allocate(0)

0 commit comments

Comments
 (0)