Skip to content

Commit 8191bcb

Browse files
committed
[SPARK-2938] Support SASL authentication in NettyBlockTransferService
Also lays the groundwork for supporting it inside the external shuffle service.
1 parent 2aca97c commit 8191bcb

File tree

27 files changed

+717
-374
lines changed

27 files changed

+717
-374
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.net.{Authenticator, PasswordAuthentication}
2222
import org.apache.hadoop.io.Text
2323

2424
import org.apache.spark.deploy.SparkHadoopUtil
25+
import org.apache.spark.network.sasl.SecretKeyHolder
2526

2627
/**
2728
* Spark class responsible for security.
@@ -139,7 +140,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
139140
* can take place.
140141
*/
141142

142-
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
143+
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
143144

144145
// key used to store the spark secret in the Hadoop UGI
145146
private val sparkSecretLookupKey = "sparkCookie"
@@ -337,4 +338,16 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
337338
* @return the secret key as a String if authentication is enabled, otherwise returns null
338339
*/
339340
def getSecretKey(): String = secretKey
341+
342+
override def getSaslUser(appId: String): String = {
343+
val myAppId = sparkConf.getAppId
344+
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
345+
getSaslUser()
346+
}
347+
348+
override def getSecretKey(appId: String): String = {
349+
val myAppId = sparkConf.getAppId
350+
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
351+
getSecretKey()
352+
}
340353
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
217217
*/
218218
getAll.filter { case (k, _) => isAkkaConf(k) }
219219

220+
/**
221+
* Returns the Spark application id, valid in the Driver after TaskScheduler registration in the
222+
* driver and from the start in the Executor.
223+
*/
224+
def getAppId: String = get("spark.app.id")
225+
220226
/** Does the configuration contain a given parameter? */
221227
def contains(key: String): Boolean = settings.contains(key)
222228

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
313313
val applicationId: String = taskScheduler.applicationId()
314314
conf.set("spark.app.id", applicationId)
315315

316+
env.blockManager.initialize(applicationId)
317+
316318
val metricsSystem = env.metricsSystem
317319

318320
// The metrics system for Driver need to be set spark.app.id to app ID.

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ object SparkEnv extends Logging {
276276
val blockTransferService =
277277
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
278278
case "netty" =>
279-
new NettyBlockTransferService(conf)
279+
new NettyBlockTransferService(conf, securityManager)
280280
case "nio" =>
281281
new NioBlockTransferService(conf, securityManager)
282282
}
@@ -285,6 +285,7 @@ object SparkEnv extends Logging {
285285
"BlockManagerMaster",
286286
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
287287

288+
// NB: blockManager is not valid until initialize() is called later.
288289
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
289290
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
290291

core/src/main/scala/org/apache/spark/SparkSaslClient.scala

Lines changed: 0 additions & 147 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/SparkSaslServer.scala

Lines changed: 0 additions & 176 deletions
This file was deleted.

0 commit comments

Comments
 (0)