Skip to content

Commit 2f77147

Browse files
committed
Rework from comments
1 parent 50dd9f2 commit 2f77147

File tree

8 files changed

+94
-43
lines changed

8 files changed

+94
-43
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import scala.collection.mutable.ArrayBuffer
5050
* Spark does not currently support encryption after authentication.
5151
*
5252
* At this point spark has multiple communication protocols that need to be secured and
53-
* different underlying mechisms are used depending on the protocol:
53+
* different underlying mechanisms are used depending on the protocol:
5454
*
5555
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
5656
* Akka remoting allows you to specify a secure cookie that will be exchanged
@@ -108,7 +108,7 @@ import scala.collection.mutable.ArrayBuffer
108108
* SparkUI can be configured to check the logged in user against the list of users who
109109
* have view acls to see if that user is authorized.
110110
* The filters can also be used for many different purposes. For instance filters
111-
* could be used for logging, encypryption, or compression.
111+
* could be used for logging, encryption, or compression.
112112
*
113113
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
114114
*
@@ -122,15 +122,11 @@ import scala.collection.mutable.ArrayBuffer
122122
* filters to do authentication. That authentication then happens via the ResourceManager Proxy
123123
* and Spark will use that to do authorization against the view acls.
124124
*
125-
* For other Spark deployments, the shared secret should be specified via the SPARK_SECRET
125+
* For other Spark deployments, the shared secret must be specified via the SPARK_SECRET
126126
* environment variable. This isn't ideal but it means only the user who starts the process
127-
* has access to view that variable. Note that Spark does try to generate a secret for
128-
* you if the SPARK_SECRET environment variable is not set, but it gets put into the java
129-
* system property which can be viewed by other users, so setting the SPARK_SECRET environment
130-
* variable is recommended.
131-
* All the nodes (Master and Workers) need to have the same shared secret
132-
* and all the applications running need to have that same shared secret. This again
133-
* is not ideal as one user could potentially affect another users application.
127+
* has access to view that variable.
128+
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
129+
* This again is not ideal as one user could potentially affect another users application.
134130
* This should be enhanced in the future to provide better protection.
135131
* If the UI needs to be secured the user needs to install a javax servlet filter to do the
136132
* authentication. Spark will then use that user to compare against the view acls to do
@@ -152,7 +148,8 @@ private[spark] class SecurityManager extends Logging {
152148
private val viewAcls = aclUsers.map(_.trim()).filter(!_.isEmpty).toSet
153149

154150
private val secretKey = generateSecretKey()
155-
logDebug("is auth enabled = " + authOn + " is uiAcls enabled = " + uiAclsOn)
151+
logInfo("SecurityManager, is authentication enabled: " + authOn +
152+
" are ui acls enabled: " + uiAclsOn)
156153

157154
// Set our own authenticator to properly negotiate user/password for HTTP connections.
158155
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
@@ -170,7 +167,7 @@ private[spark] class SecurityManager extends Logging {
170167
return passAuth
171168
}
172169
}
173-
);
170+
)
174171
}
175172

176173
/**
@@ -179,16 +176,12 @@ private[spark] class SecurityManager extends Logging {
179176
* The way the key is stored depends on the Spark deployment mode. Yarn
180177
* uses the Hadoop UGI.
181178
*
182-
* For non-Yarn deployments, If the environment variable is not set already
183-
* we generate a secret and since we can't set an environment variable dynamically
184-
* we set the java system property SPARK_SECRET. This will allow it to automatically
185-
* work in certain situations. Others this still will not work and this definitely is
186-
* not ideal since other users can see it. We should switch to put it in
187-
* a config once Spark supports configs.
179+
* For non-Yarn deployments, If the environment variable is not set
180+
* we throw an exception.
188181
*/
189182
private def generateSecretKey(): String = {
190183
if (!isAuthenticationEnabled) return null
191-
// first check to see if the secret is already set, else generate a new one
184+
// first check to see if the secret is already set, else generate a new one if on yarn
192185
if (SparkHadoopUtil.get.isYarnMode) {
193186
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey)
194187
if (secretKey != null) {
@@ -200,17 +193,17 @@ private[spark] class SecurityManager extends Logging {
200193
}
201194
val secret = System.getProperty("SPARK_SECRET", System.getenv("SPARK_SECRET"))
202195
if (secret != null && !secret.isEmpty()) return secret
203-
// generate one
204-
val sCookie = akka.util.Crypt.generateSecureCookie
205-
206-
// if we generated the secret then we must be the first so lets set it so t
207-
// gets used by everyone else
196+
val sCookie = if (SparkHadoopUtil.get.isYarnMode) {
197+
// generate one
198+
akka.util.Crypt.generateSecureCookie
199+
} else {
200+
throw new Exception("Error: a secret key must be specified via SPARK_SECRET env variable")
201+
}
208202
if (SparkHadoopUtil.get.isYarnMode) {
203+
// if we generated the secret then we must be the first so lets set it so t
204+
// gets used by everyone else
209205
SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, sCookie)
210-
logDebug("adding secret to credentials yarn mode")
211-
} else {
212-
System.setProperty("SPARK_SECRET", sCookie)
213-
logDebug("adding secret to java property")
206+
logInfo("adding secret to credentials in yarn mode")
214207
}
215208
sCookie
216209
}
@@ -223,7 +216,9 @@ private[spark] class SecurityManager extends Logging {
223216

224217
/**
225218
* Checks the given user against the view acl list to see if they have
226-
* authorization to view the UI.
219+
* authorization to view the UI. If the UI acls must are disabled
220+
* via spark.ui.acls.enable, all users have view access.
221+
*
227222
* @param user to see if is authorized
228223
* @return true is the user has permission, otherwise false
229224
*/

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
4848
val metricsHandlers = worker.metricsSystem.getServletHandlers
4949

5050
val handlers = metricsHandlers ++ Seq[ServletContextHandler](
51-
createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR, "/static/*"),
51+
createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"),
5252
createServletHandler("/log", (request: HttpServletRequest) => log(request)),
5353
createServletHandler("/logPage", (request: HttpServletRequest) => logPage(request)),
5454
createServletHandler("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
@@ -199,6 +199,6 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
199199
}
200200

201201
private[spark] object WorkerWebUI {
202-
val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
202+
val STATIC_RESOURCE_BASE = "org/apache/spark/ui"
203203
val DEFAULT_PORT="8081"
204204
}

core/src/main/scala/org/apache/spark/network/Connection.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
3232
val socketRemoteConnectionManagerId: ConnectionManagerId, val connectionId: ConnectionId)
3333
extends Logging {
3434

35-
var sparkSaslServer : SparkSaslServer = null
36-
var sparkSaslClient : SparkSaslClient = null
35+
var sparkSaslServer: SparkSaslServer = null
36+
var sparkSaslClient: SparkSaslClient = null
3737

3838
def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) = {
3939
this(channel_, selector_,

core/src/main/scala/org/apache/spark/network/ConnectionId.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ private[spark] case class ConnectionId(connectionManagerId: ConnectionManagerId,
2323

2424
private[spark] object ConnectionId {
2525

26-
def createConnectionIdFromString(connectionIdString: String) : ConnectionId = {
26+
def createConnectionIdFromString(connectionIdString: String): ConnectionId = {
2727
val res = connectionIdString.split("_").map(_.trim())
2828
if (res.size != 3) {
2929
throw new Exception("Error converting ConnectionId string: " + connectionIdString +

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
5656

5757
// default to 30 second timeout waiting for authentication
5858
private val authTimeout = System.getProperty("spark.core.connection.auth.wait.timeout",
59-
"30000").toInt
59+
"30").toInt
6060

6161
private val handleMessageExecutor = new ThreadPoolExecutor(
6262
conf.getInt("spark.core.connection.handler.threads.min", 20),
@@ -79,6 +79,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
7979
new LinkedBlockingDeque[Runnable]())
8080

8181
private val serverChannel = ServerSocketChannel.open()
82+
// used to track the SendingConnections waiting to do SASL negotiation
8283
private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection]
8384
with SynchronizedMap[ConnectionId, SendingConnection]
8485
private val connectionsByKey =
@@ -729,7 +730,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
729730
// We did not find it useful in our test-env ...
730731
// If we do re-add it, we should consistently use it everywhere I guess ?
731732
message.senderAddress = id.toSocketAddress()
732-
logDebug("Sending Security [" + message + "] to [" + connManagerId + "]")
733+
logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
733734
val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())
734735

735736
//send security message until going connection has been authenticated
@@ -745,7 +746,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
745746
val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
746747
val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId,
747748
newConnectionId)
748-
logDebug("creating new sending connection: " + newConnectionId)
749+
logTrace("creating new sending connection: " + newConnectionId)
749750
registerRequests.enqueue(newConnection)
750751

751752
newConnection
@@ -772,10 +773,10 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
772773
logDebug("getAuthenticated wait connectionid: " + connection.connectionId)
773774
// have timeout in case remote side never responds
774775
connection.getAuthenticated().wait(500)
775-
if (((clock.getTime() - startTime) >= authTimeout) && (!connection.isSaslComplete())) {
776+
if (((clock.getTime() - startTime) >= (authTimeout * 1000)) && (!connection.isSaslComplete())) {
776777
// took to long to authenticate the connection, something probably went wrong
777778
throw new Exception("Took to long for authentication to " + connectionManagerId +
778-
", waited " + authTimeout + "ms, failing.")
779+
", waited " + authTimeout + "seconds, failing.")
779780
}
780781
}
781782
}

core/src/main/scala/org/apache/spark/network/SecurityMessage.scala

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,35 @@ import scala.collection.mutable.StringBuilder
2525
import org.apache.spark._
2626
import org.apache.spark.network._
2727

28+
/**
29+
* SecurityMessage is class that contains the connectionId and sasl token
30+
* used in SASL negotiation. SecurityMessage has routines for converting
31+
* it to and from a BufferMessage so that it can be sent by the ConnectionManager
32+
* and easily consumed by users when received.
33+
* The api was modeled after BlockMessage.
34+
*
35+
* The connectionId is the connectionId of the client side. Since
36+
* message passing is asynchronous and its possible for the server side (receiving)
37+
* to get multiple different types of messages on the same connection the connectionId
38+
* is used to know which connnection the security message is intended for.
39+
*
40+
* For instance, lets say we are node_0. We need to send data to node_1. The node_0 side
41+
* is acting as a client and connecting to node_1. SASL negotiation has to occur
42+
* between node_0 and node_1 before node_1 trusts node_0 so node_0 sends a security message.
43+
* node_1 receives the message from node_0 but before it can process it and send a response,
44+
* some thread on node_1 decides it needs to send data to node_0 so it connects to node_0
45+
* and sends a security message of its own to authenticate as a client. Now node_0 gets
46+
* the message and it needs to decide if this message is in response to it being a client
47+
* (from the first send) or if its just node_1 trying to connect to it to send data. This
48+
* is where the connectionId field is used. node_0 can lookup the connectionId to see if
49+
* it is in response to it being a client or if its in response to someone sending other data.
50+
*
51+
* The format of a SecurityMessage as its sent is:
52+
* - Length of the ConnectionId
53+
* - ConnectionId
54+
* - Length of the token
55+
* - Token
56+
*/
2857
private[spark] class SecurityMessage() extends Logging {
2958

3059
private var connectionId: String = null
@@ -39,6 +68,9 @@ private[spark] class SecurityMessage() extends Logging {
3968
connectionId = newconnectionId
4069
}
4170

71+
/**
72+
* Read the given buffer and set the members of this class.
73+
*/
4274
def set(buffer: ByteBuffer) {
4375
val idLength = buffer.getInt()
4476
val idBuilder = new StringBuilder(idLength)
@@ -68,10 +100,19 @@ private[spark] class SecurityMessage() extends Logging {
68100
return token
69101
}
70102

103+
/**
104+
* Create a BufferMessage that can be sent by the ConnectionManager containing
105+
* the security information from this class.
106+
* @return BufferMessage
107+
*/
71108
def toBufferMessage: BufferMessage = {
72109
val startTime = System.currentTimeMillis
73110
val buffers = new ArrayBuffer[ByteBuffer]()
74111

112+
// 4 bytes for the length of the connectionId
113+
// connectionId is of type char so multiple the length by 2 to get number of bytes
114+
// 4 bytes for the length of token
115+
// token is a byte buffer so just take the length
75116
var buffer = ByteBuffer.allocate(4 + connectionId.length() * 2 + 4 + token.length)
76117
buffer.putInt(connectionId.length())
77118
connectionId.foreach((x: Char) => buffer.putChar(x))
@@ -96,15 +137,27 @@ private[spark] class SecurityMessage() extends Logging {
96137

97138
private[spark] object SecurityMessage {
98139

140+
/**
141+
* Convert the given BufferMessage to a SecurityMessage by parsing the contents
142+
* of the BufferMessage and populating the SecurityMessage fields.
143+
* @param bufferMessage is a BufferMessage that was received
144+
* @return new SecurityMessage
145+
*/
99146
def fromBufferMessage(bufferMessage: BufferMessage): SecurityMessage = {
100147
val newSecurityMessage = new SecurityMessage()
101148
newSecurityMessage.set(bufferMessage)
102149
newSecurityMessage
103150
}
104151

105-
def fromResponse(response : Array[Byte], newConnectionId : String) : SecurityMessage = {
152+
/**
153+
* Create a SecurityMessage to send from a given saslResponse.
154+
* @param response is the response to a challenge from the SaslClient or Saslserver
155+
* @param connectionId the client connectionId we are negotiation authentication for
156+
* @return a new SecurityMessage
157+
*/
158+
def fromResponse(response : Array[Byte], connectionId : String) : SecurityMessage = {
106159
val newSecurityMessage = new SecurityMessage()
107-
newSecurityMessage.set(response, newConnectionId)
160+
newSecurityMessage.set(response, connectionId)
108161
newSecurityMessage
109162
}
110163
}

docs/configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,9 +515,9 @@ Apart from these, the following properties are also available, and may be useful
515515
</tr>
516516
<tr>
517517
<td>spark.core.connection.auth.wait.timeout</td>
518-
<td>30000</td>
518+
<td>30</td>
519519
<td>
520-
Number of milliseconds for the connection to wait for authentication to occur before timing
520+
Number of seconds for the connection to wait for authentication to occur before timing
521521
out and giving up.
522522
</td>
523523
</tr>

docs/security.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ For Spark on Yarn deployments, configuring `spark.authenticate` to true will aut
1212
For other types of Spark deployments, the environment variable `SPARK_SECRET` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
1313

1414
See [Spark Configuration](configuration.html) for more details on the security configs.
15+
16+
See <a href="api/core/index.html#org.apache.spark.SecurityManager"><code>org.apache.spark.SecurityManager</code></a> for implementation details about security.

0 commit comments

Comments
 (0)