Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.commons.io.{FileUtils, IOUtils}

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.util.Utils
import org.apache.spark.util.{MutableURLClassLoader, Utils}

import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
Expand Down Expand Up @@ -148,39 +148,51 @@ private[hive] class IsolatedClientLoader(
protected def classToPath(name: String): String =
name.replaceAll("\\.", "/") + ".class"

/** The classloader that is used to load an isolated version of Hive. */
private[hive] var classLoader: ClassLoader = if (isolationOn) {
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader.
logDebug(s"shared class: $name")
baseClassLoader.loadClass(name)
/**
* The classloader that is used to load an isolated version of Hive.
* This classloader is a special URLClassLoader that exposes the addURL method.
* So, when we add jar, we can add this new jar directly through the addURL method
* instead of stacking a new URLClassLoader on top of it.
*/
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) {
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader.
logDebug(s"shared class: $name")
baseClassLoader.loadClass(name)
}
}
}
} else {
baseClassLoader
}
}
} else {
baseClassLoader
// Right now, we create a URLClassLoader that gives preference to isolatedClassLoader
// over its own URLs when it loads classes and resources.
// We may want to use ChildFirstURLClassLoader based on
// the configuration of spark.executor.userClassPathFirst, which gives preference
// to its own URLs over the parent class loader (see Executor's createClassLoader method).
new NonClosableMutableURLClassLoader(isolatedClassLoader)
}

private[hive] def addJar(path: String): Unit = synchronized {
val jarURL = new java.io.File(path).toURI.toURL
// TODO: we should avoid of stacking classloaders (use a single URLClassLoader and add jars
// to that)
classLoader = new java.net.URLClassLoader(Array(jarURL), classLoader)
classLoader.addURL(jarURL)
}

/** The isolated client interface to Hive. */
Expand Down Expand Up @@ -221,3 +233,14 @@ private[hive] class IsolatedClientLoader(
*/
private[hive] var cachedHive: Any = null
}

/**
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
* This class loader cannot be closed (its `close` method is a no-op).
*/
private[sql] class NonClosableMutableURLClassLoader(
parent: ClassLoader)
extends MutableURLClassLoader(Array.empty, parent) {

override def close(): Unit = {}
}