Skip to content

Commit 9d3cf88

Browse files
committed
[LIHADOOP-71407] Store bytebuffer in the db instead of String (apache#111)
* [LIHADOOP-71407] Store bytebuffer in the db instead of String
1 parent 2cbe049 commit 9d3cf88

File tree

2 files changed

+18
-11
lines changed

2 files changed

+18
-11
lines changed

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,9 @@ public void initializeApplication(ApplicationInitializationContext context) {
440440
if (db != null && AppsWithRecoveryDisabled.isRecoveryEnabledForApp(appId)) {
441441
AppId fullId = new AppId(appId);
442442
byte[] key = dbAppKey(fullId);
443-
byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
443+
ByteBuffer dbVal = metaInfo != null ?
444+
JavaUtils.stringToBytes(shuffleSecret) : appServiceData;
445+
byte[] value = mapper.writeValueAsString(dbVal).getBytes(StandardCharsets.UTF_8);
444446
db.put(key, value);
445447
}
446448
secretManager.registerApp(appId, shuffleSecret);

resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
7171
private[yarn] val SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID =
7272
"org.apache.spark.shuffle.sort.SortShuffleManager:{\"mergeDir\": \"merge_manager\"}"
7373
private val DUMMY_BLOCK_DATA = "dummyBlockData".getBytes(StandardCharsets.UTF_8)
74+
private val DUMMY_PASSWORD = "dummyPassword"
75+
private val EMPTY_PASSWORD = ""
7476

7577
private var recoveryLocalDir: File = _
7678
protected var tempDir: File = _
@@ -191,7 +193,8 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
191193
val app3Data = makeAppInfo("user", app3Id)
192194
s1.initializeApplication(app3Data)
193195
val app4Id = ApplicationId.newInstance(0, 4)
194-
val app4Data = makeAppInfo("user", app4Id)
196+
val app4Data = makeAppInfo("user", app4Id, metadataStorageDisabled = false,
197+
authEnabled = true, DUMMY_PASSWORD)
195198
s1.initializeApplication(app4Data)
196199

197200
val execStateFile = s1.registeredExecutorFile
@@ -1038,15 +1041,15 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
10381041

10391042
private def makeAppInfo(user: String, appId: ApplicationId,
10401043
metadataStorageDisabled: Boolean = false,
1041-
authEnabled: Boolean = true): ApplicationInitializationContext = {
1044+
authEnabled: Boolean = true,
1045+
password: String = EMPTY_PASSWORD): ApplicationInitializationContext = {
10421046
if (!metadataStorageDisabled) {
1043-
val secret = ByteBuffer.wrap(new Array[Byte](0))
1044-
new ApplicationInitializationContext(user, appId, secret)
1047+
new ApplicationInitializationContext(user, appId, JavaUtils.stringToBytes(password))
10451048
} else {
10461049
val payload = new mutable.HashMap[String, Object]()
10471050
payload.put(YarnShuffleService.SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED, java.lang.Boolean.TRUE)
10481051
if (authEnabled) {
1049-
payload.put(YarnShuffleService.SECRET_KEY, "")
1052+
payload.put(YarnShuffleService.SECRET_KEY, password)
10501053
}
10511054
val mapper = new ObjectMapper()
10521055
mapper.registerModule(DefaultScalaModule)
@@ -1133,13 +1136,15 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
11331136
yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
11341137
s1 = createYarnShuffleService()
11351138
val app1Id = ApplicationId.newInstance(1681252509, 1)
1136-
val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true)
1139+
val app1Data = makeAppInfo("user", app1Id, metadataStorageDisabled = true,
1140+
authEnabled = true, EMPTY_PASSWORD)
11371141
s1.initializeApplication(app1Data)
11381142
val app2Id = ApplicationId.newInstance(1681252509, 2)
1139-
val app2Data = makeAppInfo("user", app2Id)
1143+
val app2Data = makeAppInfo("user", app2Id, metadataStorageDisabled = false,
1144+
authEnabled = true, DUMMY_PASSWORD)
11401145
s1.initializeApplication(app2Data)
1141-
assert(s1.secretManager.getSecretKey(app1Id.toString()) == "")
1142-
assert(s1.secretManager.getSecretKey(app2Id.toString()) == "")
1146+
assert(s1.secretManager.getSecretKey(app1Id.toString()) == EMPTY_PASSWORD)
1147+
assert(s1.secretManager.getSecretKey(app2Id.toString()) == DUMMY_PASSWORD)
11431148

11441149
val execShuffleInfo1 =
11451150
new ExecutorShuffleInfo(
@@ -1191,7 +1196,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
11911196
s2 = createYarnShuffleService()
11921197
// Since secret of app1 is not saved in the db, it isn't recovered
11931198
assert(s2.secretManager.getSecretKey(app1Id.toString()) == null)
1194-
assert(s2.secretManager.getSecretKey(app2Id.toString()) == "")
1199+
assert(s2.secretManager.getSecretKey(app2Id.toString()) == DUMMY_PASSWORD)
11951200

11961201
val resolver2 = ShuffleTestAccessor.getBlockResolver(s2.blockHandler)
11971202
val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]

0 commit comments

Comments
 (0)