Skip to content

Commit 93050f4

Browse files
SPARK-3883: SSL support for HttpServer and Akka
- Introduced SSLOptions object - SSLOptions is created by SecurityManager - SSLOptions configures Akka and Jetty to use SSL - Provided utility methods to determine the proper Akka protocol for Akka requests and to configure SSL socket factory for URL connections - Added tests cases for AkkaUtils, FileServer, SSLOptions and SecurityManager - Added a way to use node local SSL configuration by executors and driver - Make CoarseGrainedExecutorBackend not overwrite the settings which are executor startup configuration - they are passed anyway from Worker
1 parent 63dfe21 commit 93050f4

File tree

35 files changed

+852
-76
lines changed

35 files changed

+852
-76
lines changed

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import java.io.File
2121

22+
import org.eclipse.jetty.server.ssl.SslSocketConnector
2223
import org.eclipse.jetty.util.security.{Constraint, Password}
2324
import org.eclipse.jetty.security.authentication.DigestAuthenticator
2425
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
@@ -72,7 +73,10 @@ private[spark] class HttpServer(
7273
*/
7374
private def doStart(startPort: Int): (Server, Int) = {
7475
val server = new Server()
75-
val connector = new SocketConnector
76+
77+
val connector = securityManager.sslOptions.createJettySslContextFactory()
78+
.map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
79+
7680
connector.setMaxIdleTime(60 * 1000)
7781
connector.setSoLingerTime(-1)
7882
connector.setPort(startPort)
@@ -149,13 +153,14 @@ private[spark] class HttpServer(
149153
}
150154

151155
/**
152-
* Get the URI of this HTTP server (http://host:port)
156+
* Get the URI of this HTTP server (http://host:port or https://host:port)
153157
*/
154158
def uri: String = {
155159
if (server == null) {
156160
throw new ServerStateException("Server is not started")
157161
} else {
158-
"http://" + Utils.localIpAddress + ":" + port
162+
val scheme = if (securityManager.sslOptions.enabled) "https" else "http"
163+
s"$scheme://${Utils.localIpAddress}:$port"
159164
}
160165
}
161166
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.io.File
21+
22+
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
23+
import org.eclipse.jetty.util.ssl.SslContextFactory
24+
25+
private[spark] case class SSLOptions(
26+
enabled: Boolean = false,
27+
keyStore: Option[File] = None,
28+
keyStorePassword: Option[String] = None,
29+
keyPassword: Option[String] = None,
30+
trustStore: Option[File] = None,
31+
trustStorePassword: Option[String] = None,
32+
protocol: Option[String] = None,
33+
enabledAlgorithms: Set[String] = Set.empty) {
34+
35+
/**
36+
* Creates a Jetty SSL context factory according to the SSL settings represented by this object.
37+
*/
38+
def createJettySslContextFactory(): Option[SslContextFactory] = {
39+
if (enabled) {
40+
val sslContextFactory = new SslContextFactory()
41+
42+
keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
43+
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
44+
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
45+
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
46+
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
47+
protocol.foreach(sslContextFactory.setProtocol)
48+
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
49+
50+
Some(sslContextFactory)
51+
} else {
52+
None
53+
}
54+
}
55+
56+
/**
57+
* Creates an Akka configuration object which contains all the SSL settings represented by this
58+
* object. It can be used then to compose the ultimate Akka configuration.
59+
*/
60+
def createAkkaConfig: Option[Config] = {
61+
import scala.collection.JavaConversions._
62+
if (enabled) {
63+
Some(ConfigFactory.empty()
64+
.withValue("akka.remote.netty.tcp.security.key-store",
65+
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
66+
.withValue("akka.remote.netty.tcp.security.key-store-password",
67+
ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
68+
.withValue("akka.remote.netty.tcp.security.trust-store",
69+
ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
70+
.withValue("akka.remote.netty.tcp.security.trust-store-password",
71+
ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
72+
.withValue("akka.remote.netty.tcp.security.key-password",
73+
ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
74+
.withValue("akka.remote.netty.tcp.security.random-number-generator",
75+
ConfigValueFactory.fromAnyRef(""))
76+
.withValue("akka.remote.netty.tcp.security.protocol",
77+
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
78+
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
79+
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
80+
.withValue("akka.remote.netty.tcp.enable-ssl",
81+
ConfigValueFactory.fromAnyRef(true)))
82+
} else {
83+
None
84+
}
85+
}
86+
87+
override def toString: String = s"SSLOptions{enabled=$enabled, " +
88+
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
89+
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
90+
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
91+
92+
}
93+
94+
private[spark] object SSLOptions extends Logging {
95+
96+
/**
97+
* Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
98+
* The parent directory of that location is used as a base directory to resolve relative paths
99+
* to keystore and truststore.
100+
*/
101+
def parse(conf: SparkConf, ns: String): SSLOptions = {
102+
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = false)
103+
val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
104+
val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
105+
val keyPassword = conf.getOption(s"$ns.keyPassword")
106+
val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
107+
val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
108+
val protocol = conf.getOption(s"$ns.protocol")
109+
val enabledAlgorithms = conf.get(s"$ns.enabledAlgorithms", defaultValue = "")
110+
.split(",").map(_.trim).filter(_.nonEmpty).toSet
111+
112+
new SSLOptions(
113+
enabled,
114+
keyStore,
115+
keyStorePassword,
116+
keyPassword,
117+
trustStore,
118+
trustStorePassword,
119+
protocol,
120+
enabledAlgorithms)
121+
}
122+
123+
}
124+

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
package org.apache.spark
1919

2020
import java.net.{Authenticator, PasswordAuthentication}
21+
import java.security.KeyStore
22+
import java.security.cert.X509Certificate
23+
import javax.net.ssl._
2124

25+
import com.google.common.io.Files
2226
import org.apache.hadoop.io.Text
2327

2428
import org.apache.spark.deploy.SparkHadoopUtil
@@ -144,7 +148,8 @@ import org.apache.spark.network.sasl.SecretKeyHolder
144148
* can take place.
145149
*/
146150

147-
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
151+
private[spark] class SecurityManager(sparkConf: SparkConf)
152+
extends Logging with SecretKeyHolder {
148153

149154
// key used to store the spark secret in the Hadoop UGI
150155
private val sparkSecretLookupKey = "sparkCookie"
@@ -196,6 +201,49 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
196201
)
197202
}
198203

204+
val sslOptions = SSLOptions.parse(sparkConf, "spark.ssl")
205+
logDebug(s"SSLConfiguration: $sslOptions")
206+
207+
val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) {
208+
val trustStoreManagers =
209+
for (trustStore <- sslOptions.trustStore) yield {
210+
val input = Files.asByteSource(sslOptions.trustStore.get).openStream()
211+
212+
try {
213+
val ks = KeyStore.getInstance(KeyStore.getDefaultType)
214+
ks.load(input, sslOptions.trustStorePassword.get.toCharArray)
215+
216+
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
217+
tmf.init(ks)
218+
tmf.getTrustManagers
219+
} finally {
220+
input.close()
221+
}
222+
}
223+
224+
lazy val credulousTrustStoreManagers = Array({
225+
logWarning("Using 'accept-all' trust manager for SSL connections.")
226+
new X509TrustManager {
227+
override def getAcceptedIssuers: Array[X509Certificate] = null
228+
229+
override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}
230+
231+
override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
232+
}: TrustManager
233+
})
234+
235+
val sslContext = SSLContext.getInstance(sslOptions.protocol.getOrElse("Default"))
236+
sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)
237+
238+
val hostVerifier = new HostnameVerifier {
239+
override def verify(s: String, sslSession: SSLSession): Boolean = true
240+
}
241+
242+
(Some(sslContext.getSocketFactory), Some(hostVerifier))
243+
} else {
244+
(None, None)
245+
}
246+
199247
/**
200248
* Split a comma separated String, filter out any empty items, and return a Set of strings
201249
*/

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ private[spark] object SparkConf {
369369
isAkkaConf(name) ||
370370
name.startsWith("spark.akka") ||
371371
name.startsWith("spark.auth") ||
372+
name.startsWith("spark.ssl") ||
372373
isSparkPortConf(name)
373374
}
374375

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging {
199199
uc = new URL(url).openConnection()
200200
uc.setConnectTimeout(httpReadTimeout)
201201
}
202+
Utils.setupSecureURLConnection(uc, securityManager)
202203

203204
val in = {
204205
uc.setReadTimeout(httpReadTimeout)

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,14 @@ private[spark] class ApplicationDescription(
2828

2929
val user = System.getProperty("user.name", "<unknown>")
3030

31+
def copy(
32+
name: String = name,
33+
maxCores: Option[Int] = maxCores,
34+
memoryPerSlave: Int = memoryPerSlave,
35+
command: Command = command,
36+
appUiUrl: String = appUiUrl,
37+
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
38+
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
39+
3140
override def toString: String = "ApplicationDescription(" + name + ")"
3241
}

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
3939
val timeout = AkkaUtils.askTimeout(conf)
4040

4141
override def preStart() = {
42-
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
42+
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master, conf))
4343

4444
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
4545

@@ -161,7 +161,7 @@ object Client {
161161
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
162162

163163
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
164-
Master.toAkkaUrl(driverArgs.master)
164+
Master.toAkkaUrl(driverArgs.master, conf)
165165
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
166166

167167
actorSystem.awaitTermination()

core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,13 @@ private[spark] class DriverDescription(
2525
val command: Command)
2626
extends Serializable {
2727

28+
def copy(
29+
jarUrl: String = jarUrl,
30+
mem: Int = mem,
31+
cores: Int = cores,
32+
supervise: Boolean = supervise,
33+
command: Command = command): DriverDescription =
34+
new DriverDescription(jarUrl, mem, cores, supervise, command)
35+
2836
override def toString: String = s"DriverDescription (${command.mainClass})"
2937
}

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[spark] class AppClient(
4747
conf: SparkConf)
4848
extends Logging {
4949

50-
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
50+
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, conf))
5151

5252
val REGISTRATION_TIMEOUT = 20.seconds
5353
val REGISTRATION_RETRIES = 3
@@ -107,8 +107,8 @@ private[spark] class AppClient(
107107
def changeMaster(url: String) {
108108
// activeMasterUrl is a valid Spark url since we receive it from master.
109109
activeMasterUrl = url
110-
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
111-
masterAddress = Master.toAkkaAddress(activeMasterUrl)
110+
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf))
111+
masterAddress = Master.toAkkaAddress(activeMasterUrl, conf)
112112
}
113113

114114
private def isPossibleMaster(remoteUrl: Address) = {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -860,19 +860,19 @@ private[spark] object Master extends Logging {
860860
*
861861
* @throws SparkException if the url is invalid
862862
*/
863-
def toAkkaUrl(sparkUrl: String): String = {
863+
def toAkkaUrl(sparkUrl: String, conf: SparkConf): String = {
864864
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
865-
"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
865+
AkkaUtils.address(systemName, host, port, actorName, conf)
866866
}
867867

868868
/**
869869
* Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`.
870870
*
871871
* @throws SparkException if the url is invalid
872872
*/
873-
def toAkkaAddress(sparkUrl: String): Address = {
873+
def toAkkaAddress(sparkUrl: String, conf: SparkConf): Address = {
874874
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
875-
Address("akka.tcp", systemName, host, port)
875+
Address(AkkaUtils.protocol(conf), systemName, host, port)
876876
}
877877

878878
def startSystemAndActor(

0 commit comments

Comments
 (0)