Skip to content

Commit c41e7bc

Browse files
committed
More cleanup
1 parent 2f0b1ad commit c41e7bc

File tree

9 files changed

+128
-119
lines changed

9 files changed

+128
-119
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ object SessionCatalog {
5050
class SessionCatalog(
5151
externalCatalog: ExternalCatalog,
5252
globalTempViewManager: GlobalTempViewManager,
53-
functionResourceLoader: FunctionResourceLoader,
5453
functionRegistry: FunctionRegistry,
5554
conf: CatalystConf,
5655
hadoopConf: Configuration,
@@ -66,11 +65,11 @@ class SessionCatalog(
6665
this(
6766
externalCatalog,
6867
new GlobalTempViewManager("global_temp"),
69-
DummyFunctionResourceLoader,
7068
functionRegistry,
7169
conf,
7270
new Configuration(),
7371
CatalystSqlParser)
72+
functionResourceLoader = DummyFunctionResourceLoader
7473
}
7574

7675
// For testing only.
@@ -92,6 +91,8 @@ class SessionCatalog(
9291
@GuardedBy("this")
9392
protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
9493

94+
@volatile var functionResourceLoader: FunctionResourceLoader = _
95+
9596
/**
9697
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
9798
* i.e. if this name only contains characters, numbers, and _.
@@ -990,6 +991,9 @@ class SessionCatalog(
990991
* by a tuple (resource type, resource uri).
991992
*/
992993
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
994+
if (functionResourceLoader == null) {
995+
throw new IllegalStateException("functionResourceLoader has not yet been initialized")
996+
}
993997
resources.foreach(functionResourceLoader.loadResource)
994998
}
995999

@@ -1186,22 +1190,17 @@ class SessionCatalog(
11861190
}
11871191

11881192
/**
1189-
* Get an identical copy of the `SessionCatalog`.
1190-
* The temporary views and function registry are retained.
1191-
* The table relation cache will not be populated.
1192-
* @note `externalCatalog` and `globalTempViewManager` are from shared state, do not need deep
1193-
* copy. `FunctionResourceLoader` is effectively stateless, also does not need deep copy.
1194-
* All arguments passed in should be associated with a particular `SparkSession`.
1193+
* Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and
1194+
* `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
11951195
*/
1196-
def clone(
1196+
def newSessionCatalogWith(
11971197
conf: CatalystConf,
11981198
hadoopConf: Configuration,
11991199
functionRegistry: FunctionRegistry,
12001200
parser: ParserInterface): SessionCatalog = {
12011201
val catalog = new SessionCatalog(
12021202
externalCatalog,
12031203
globalTempViewManager,
1204-
functionResourceLoader,
12051204
functionRegistry,
12061205
conf,
12071206
hadoopConf,

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,7 +1206,7 @@ class SessionCatalogSuite extends PlanTest {
12061206
original.createTempView("copytest1", tempTable1, overrideIfExists = false)
12071207

12081208
// check if tables copied over
1209-
val clone = original.clone(
1209+
val clone = original.newSessionCatalogWith(
12101210
SimpleCatalystConf(caseSensitiveAnalysis = true),
12111211
new Configuration(),
12121212
new SimpleFunctionRegistry,
@@ -1236,7 +1236,7 @@ class SessionCatalogSuite extends PlanTest {
12361236
original.setCurrentDatabase(db1)
12371237

12381238
// check if current db copied over
1239-
val clone = original.clone(
1239+
val clone = original.newSessionCatalogWith(
12401240
SimpleCatalystConf(caseSensitiveAnalysis = true),
12411241
new Configuration(),
12421242
new SimpleFunctionRegistry,

sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,8 @@ class ExperimentalMethods private[sql]() {
4848

4949
override def clone(): ExperimentalMethods = {
5050
val result = new ExperimentalMethods
51-
synchronized {
52-
result.extraStrategies = extraStrategies
53-
result.extraOptimizations = extraOptimizations
54-
}
51+
result.extraStrategies = extraStrategies
52+
result.extraOptimizations = extraOptimizations
5553
result
5654
}
5755
}

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,6 @@ class SparkSession private(
8080
@transient private val parentSessionState: Option[SessionState])
8181
extends Serializable with Closeable with Logging { self =>
8282

83-
private[sql] def this(sc: SparkContext, existingSharedState: Option[SharedState]) {
84-
this(sc, existingSharedState, None)
85-
}
86-
8783
private[sql] def this(sc: SparkContext) {
8884
this(sc, None, None)
8985
}
@@ -129,9 +125,11 @@ class SparkSession private(
129125
lazy val sessionState: SessionState = {
130126
parentSessionState
131127
.map(_.clone(this))
132-
.getOrElse(SparkSession.instantiateSessionState(
133-
SparkSession.sessionStateClassName(sparkContext.conf),
134-
self))
128+
.getOrElse {
129+
SparkSession.instantiateSessionState(
130+
SparkSession.sessionStateClassName(sparkContext.conf),
131+
self)
132+
}
135133
}
136134

137135
/**
@@ -221,13 +219,12 @@ class SparkSession private(
221219
* @since 2.0.0
222220
*/
223221
def newSession(): SparkSession = {
224-
new SparkSession(sparkContext, Some(sharedState))
222+
new SparkSession(sparkContext, Some(sharedState), parentSessionState = None)
225223
}
226224

227225
/**
228-
* :: Experimental ::
229226
* Create an identical copy of this `SparkSession`, sharing the underlying `SparkContext`
230-
* and cached data. All the state of this session (i.e. SQL configurations, temporary tables,
227+
* and shared state. All the state of this session (i.e. SQL configurations, temporary tables,
231228
* registered functions) is copied over, and the cloned session is set up with the same shared
232229
* state as this session. The cloned session is independent of this session, that is, any
233230
* non-global change in either session is not reflected in the other.
@@ -236,12 +233,8 @@ class SparkSession private(
236233
* This method will force the initialization of the shared state to ensure that parent
237234
* and child sessions are set up with the same shared state. If the underlying catalog
238235
* implementation is Hive, this will initialize the metastore, which may take some time.
239-
*
240-
* @since 2.2.0
241236
*/
242-
@Experimental
243-
@InterfaceStability.Evolving
244-
def cloneSession(): SparkSession = {
237+
private[sql] def cloneSession(): SparkSession = {
245238
val result = new SparkSession(sparkContext, Some(sharedState), Some(sessionState))
246239
result.sessionState // force copy of SessionState
247240
result
@@ -919,6 +912,7 @@ object SparkSession {
919912
}
920913
})
921914
}
915+
922916
return session
923917
}
924918
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 72 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.spark.sql.internal
1919

20+
import java.io.File
21+
2022
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.fs.Path
2124

2225
import org.apache.spark.{SparkConf, SparkContext}
2326
import org.apache.spark.sql._
@@ -35,6 +38,10 @@ import org.apache.spark.sql.util.ExecutionListenerManager
3538

3639
/**
3740
* A class that holds all session-specific state in a given [[SparkSession]].
41+
* @param sparkContext The [[SparkContext]].
42+
* @param sharedState The shared state.
43+
* @param conf SQL-specific key-value configurations.
44+
* @param experimentalMethods The experimental methods.
3845
* @param functionRegistry Internal catalog for managing functions registered by the user.
3946
* @param catalog Internal catalog for managing table and database states.
4047
* @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
@@ -55,42 +62,60 @@ private[sql] class SessionState(
5562
val streamingQueryManager: StreamingQueryManager,
5663
val queryExecutionCreator: LogicalPlan => QueryExecution) {
5764

65+
def newHadoopConf(): Configuration = SessionState.newHadoopConf(
66+
sparkContext.hadoopConfiguration,
67+
conf)
68+
69+
def newHadoopConfWithOptions(options: Map[String, String]): Configuration = {
70+
val hadoopConf = newHadoopConf()
71+
options.foreach { case (k, v) =>
72+
if ((v ne null) && k != "path" && k != "paths") {
73+
hadoopConf.set(k, v)
74+
}
75+
}
76+
hadoopConf
77+
}
78+
79+
/**
80+
* A class for loading resources specified by a function.
81+
*/
82+
val functionResourceLoader: FunctionResourceLoader = {
83+
new FunctionResourceLoader {
84+
override def loadResource(resource: FunctionResource): Unit = {
85+
resource.resourceType match {
86+
case JarResource => addJar(resource.uri)
87+
case FileResource => sparkContext.addFile(resource.uri)
88+
case ArchiveResource =>
89+
throw new AnalysisException(
90+
"Archive is not allowed to be loaded. If YARN mode is used, " +
91+
"please use --archives options while calling spark-submit.")
92+
}
93+
}
94+
}
95+
}
96+
5897
/**
5998
* Interface exposed to the user for registering user-defined functions.
6099
* Note that the user-defined functions must be deterministic.
61100
*/
62101
val udf: UDFRegistration = new UDFRegistration(functionRegistry)
63102

64103
/**
65-
* Logical query plan optimizer.
104+
* Logical query plan optimizer.
66105
*/
67106
val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)
68107

69-
/**
70-
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
71-
* that listen for execution metrics.
72-
*/
73-
val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
74-
75108
/**
76109
* Planner that converts optimized logical plans to physical plans.
77110
*/
78111
def planner: SparkPlanner =
79112
new SparkPlanner(sparkContext, conf, experimentalMethods.extraStrategies)
80113

81-
def newHadoopConf(): Configuration = SessionState.newHadoopConf(
82-
sparkContext.hadoopConfiguration,
83-
conf)
84-
85-
def newHadoopConfWithOptions(options: Map[String, String]): Configuration = {
86-
val hadoopConf = newHadoopConf()
87-
options.foreach { case (k, v) =>
88-
if ((v ne null) && k != "path" && k != "paths") {
89-
hadoopConf.set(k, v)
90-
}
91-
}
92-
hadoopConf
93-
}
114+
/**
115+
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
116+
* that listen for execution metrics.
117+
*/
118+
val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
94119

95120
/**
96121
* Get an identical copy of the `SessionState` and associate it with the given `SparkSession`
@@ -100,7 +125,7 @@ private[sql] class SessionState(
100125
val confCopy = conf.clone()
101126
val functionRegistryCopy = functionRegistry.clone()
102127
val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
103-
val catalogCopy = catalog.clone(
128+
val catalogCopy = catalog.newSessionCatalogWith(
104129
confCopy,
105130
SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy),
106131
functionRegistryCopy,
@@ -132,7 +157,26 @@ private[sql] class SessionState(
132157
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
133158
}
134159

135-
def addJar(path: String): Unit = sharedState.addJar(path)
160+
/**
161+
* Add a jar path to [[SparkContext]] and the classloader.
162+
*
163+
* Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
164+
* to add the jar to its hive client for the current session. Hence, it still needs to be in
165+
* [[SessionState]].
166+
*/
167+
def addJar(path: String): Unit = {
168+
sparkContext.addJar(path)
169+
val uri = new Path(path).toUri
170+
val jarURL = if (uri.getScheme == null) {
171+
// `path` is a local file path without a URL scheme
172+
new File(path).toURI.toURL
173+
} else {
174+
// `path` is a URL with a scheme
175+
uri.toURL
176+
}
177+
sharedState.jarClassLoader.addURL(jarURL)
178+
Thread.currentThread().setContextClassLoader(sharedState.jarClassLoader)
179+
}
136180
}
137181

138182

@@ -150,16 +194,11 @@ object SessionState {
150194

151195
val functionRegistry = FunctionRegistry.builtin.clone()
152196

153-
// A class for loading resources specified by a function.
154-
val functionResourceLoader: FunctionResourceLoader =
155-
createFunctionResourceLoader(sparkContext, sparkSession.sharedState)
156-
157197
val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
158198

159199
val catalog = new SessionCatalog(
160200
sparkSession.sharedState.externalCatalog,
161201
sparkSession.sharedState.globalTempViewManager,
162-
functionResourceLoader,
163202
functionRegistry,
164203
sqlConf,
165204
newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
@@ -171,7 +210,7 @@ object SessionState {
171210

172211
val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(sparkSession, plan)
173212

174-
new SessionState(
213+
val sessionState = new SessionState(
175214
sparkContext,
176215
sparkSession.sharedState,
177216
sqlConf,
@@ -182,23 +221,11 @@ object SessionState {
182221
analyzer,
183222
streamingQueryManager,
184223
queryExecutionCreator)
185-
}
186-
187-
def createFunctionResourceLoader(
188-
sparkContext: SparkContext,
189-
sharedState: SharedState): FunctionResourceLoader = {
190-
new FunctionResourceLoader {
191-
override def loadResource(resource: FunctionResource): Unit = {
192-
resource.resourceType match {
193-
case JarResource => sharedState.addJar(resource.uri)
194-
case FileResource => sparkContext.addFile(resource.uri)
195-
case ArchiveResource =>
196-
throw new AnalysisException(
197-
"Archive is not allowed to be loaded. If YARN mode is used, " +
198-
"please use --archives options while calling spark-submit.")
199-
}
200-
}
201-
}
224+
// functionResourceLoader needs to access SessionState.addJar, so it cannot be created before
225+
// creating SessionState. Setting `catalog.functionResourceLoader` here is safe since the caller
226+
// cannot use SessionCatalog before we return SessionState.
227+
catalog.functionResourceLoader = sessionState.functionResourceLoader
228+
sessionState
202229
}
203230

204231
def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): Configuration = {

sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.internal
1919

20-
import java.io.File
21-
2220
import scala.reflect.ClassTag
2321
import scala.util.control.NonFatal
2422

@@ -146,20 +144,6 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
146144
}
147145
SparkSession.sqlListener.get()
148146
}
149-
150-
def addJar(path: String): Unit = {
151-
sparkContext.addJar(path)
152-
val uri = new Path(path).toUri
153-
val jarURL = if (uri.getScheme == null) {
154-
// `path` is a local file path without a URL scheme
155-
new File(path).toURI.toURL
156-
} else {
157-
// `path` is a URL with a scheme
158-
uri.toURL
159-
}
160-
jarClassLoader.addURL(jarURL)
161-
Thread.currentThread().setContextClassLoader(jarClassLoader)
162-
}
163147
}
164148

165149
object SharedState {

0 commit comments

Comments
 (0)