Skip to content

Commit c72f6ac

Browse files
committed
rxins comments
1 parent 1e271fa commit c72f6ac

File tree

6 files changed

+70
-60
lines changed

6 files changed

+70
-60
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
2727
*/
2828
class NoSuchTableException extends Exception
2929

30+
class NoSuchDatabaseException extends Exception
31+
3032
/**
3133
* An interface for looking up relations by name. Used by an [[Analyzer]].
3234
*/

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.client
1919

20-
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
20+
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
2121

2222
case class HiveDatabase(
2323
name: String,
@@ -91,7 +91,7 @@ trait ClientInterface {
9191

9292
/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
9393
def getDatabase(name: String): HiveDatabase = {
94-
getDatabaseOption(name).getOrElse(sys.error(s"No such database $name"))
94+
getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
9595
}
9696

9797
/** Returns the metadata for a given database, or None if it doesn't exist. */
@@ -112,7 +112,7 @@ trait ClientInterface {
112112
def alterTable(table: HiveTable): Unit
113113

114114
/** Creates a new database with the given name. */
115-
def createDatabase(databaseName: String): Unit
115+
def createDatabase(database: HiveDatabase): Unit
116116

117117
/** Returns all partitions for the given table. */
118118
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
@@ -121,7 +121,7 @@ trait ClientInterface {
121121
def loadPartition(
122122
loadPath: String,
123123
tableName: String,
124-
partSpec: java.util.LinkedHashMap[String, String],
124+
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
125125
replace: Boolean,
126126
holdDDLTime: Boolean,
127127
inheritTableSpecs: Boolean,
@@ -138,7 +138,7 @@ trait ClientInterface {
138138
def loadDynamicPartitions(
139139
loadPath: String,
140140
tableName: String,
141-
partSpec: java.util.LinkedHashMap[String, String],
141+
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
142142
replace: Boolean,
143143
numDP: Int,
144144
holdDDLTime: Boolean,

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ import scala.language.reflectiveCalls
2727
import org.apache.hadoop.fs.Path
2828
import org.apache.hadoop.hive.metastore.api.Database
2929
import org.apache.hadoop.hive.conf.HiveConf
30+
import org.apache.hadoop.hive.metastore.api
3031
import org.apache.hadoop.hive.metastore.api.FieldSchema
31-
import org.apache.hadoop.hive.ql.metadata._
32+
import org.apache.hadoop.hive.ql.metadata
33+
import org.apache.hadoop.hive.ql.metadata.Hive
3234
import org.apache.hadoop.hive.ql.session.SessionState
3335
import org.apache.hadoop.hive.ql.processors._
3436
import org.apache.hadoop.hive.ql.Driver
@@ -65,13 +67,6 @@ class ClientWrapper(
6567
conf.set(k, v)
6668
}
6769

68-
private def properties = Seq(
69-
"javax.jdo.option.ConnectionURL",
70-
"javax.jdo.option.ConnectionDriverName",
71-
"javax.jdo.option.ConnectionUserName")
72-
73-
properties.foreach(p => logInfo(s"Hive Configuration: $p = ${conf.get(p)}"))
74-
7570
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
7671
private val outputBuffer = new java.io.OutputStream {
7772
var pos: Int = 0
@@ -117,7 +112,10 @@ class ClientWrapper(
117112

118113
private val client = Hive.get(conf)
119114

120-
private def withClassLoader[A](f: => A): A = synchronized {
115+
/**
116+
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
117+
*/
118+
private def withHiveState[A](f: => A): A = synchronized {
121119
val original = Thread.currentThread().getContextClassLoader
122120
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
123121
Hive.set(client)
@@ -135,25 +133,32 @@ class ClientWrapper(
135133
ret
136134
}
137135

138-
def currentDatabase: String = withClassLoader {
136+
override def currentDatabase: String = withHiveState {
139137
state.getCurrentDatabase
140138
}
141139

142-
def createDatabase(tableName: String): Unit = withClassLoader {
143-
val table = new Table("default", tableName)
140+
override def createDatabase(database: HiveDatabase): Unit = withHiveState {
144141
client.createDatabase(
145-
new Database("default", "", new File("").toURI.toString, new java.util.HashMap), true)
142+
new Database(
143+
database.name,
144+
"",
145+
new File(database.location).toURI.toString,
146+
new java.util.HashMap),
147+
true)
146148
}
147149

148-
def getDatabaseOption(name: String): Option[HiveDatabase] = withClassLoader {
150+
override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
149151
Option(client.getDatabase(name)).map { d =>
150152
HiveDatabase(
151153
name = d.getName,
152154
location = d.getLocationUri)
153155
}
154156
}
155157

156-
def getTableOption(dbName: String, tableName: String): Option[HiveTable] = withClassLoader {
158+
override def getTableOption(
159+
dbName: String,
160+
tableName: String): Option[HiveTable] = withHiveState {
161+
157162
logDebug(s"Looking up $dbName.$tableName")
158163

159164
val hiveTable = Option(client.getTable(dbName, tableName, false))
@@ -185,8 +190,8 @@ class ClientWrapper(
185190
Class.forName(name)
186191
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
187192

188-
private def toQlTable(table: HiveTable): Table = {
189-
val qlTable = new Table(table.database, table.name)
193+
private def toQlTable(table: HiveTable): metadata.Table = {
194+
val qlTable = new metadata.Table(table.database, table.name)
190195

191196
qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
192197
qlTable.setPartCols(
@@ -208,25 +213,23 @@ class ClientWrapper(
208213
qlTable
209214
}
210215

211-
def createTable(table: HiveTable): Unit = withClassLoader {
216+
override def createTable(table: HiveTable): Unit = withHiveState {
212217
val qlTable = toQlTable(table)
213218
client.createTable(qlTable)
214219
}
215220

216-
def alterTable(table: HiveTable): Unit = withClassLoader {
221+
override def alterTable(table: HiveTable): Unit = withHiveState {
217222
val qlTable = toQlTable(table)
218223
client.alterTable(table.qualifiedName, qlTable)
219224
}
220225

221-
def getTables(dbName: String): Seq[String] = withClassLoader {
222-
client.getAllTables(dbName).toSeq
223-
}
224-
225-
def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withClassLoader {
226+
override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
226227
val qlTable = toQlTable(hTable)
227228
val qlPartitions = version match {
228-
case hive.v12 => client.call[Table, Set[Partition]]("getAllPartitionsForPruner", qlTable)
229-
case hive.v13 => client.call[Table, Set[Partition]]("getAllPartitionsOf", qlTable)
229+
case hive.v12 =>
230+
client.call[metadata.Table, Set[metadata.Partition]]("getAllPartitionsForPruner", qlTable)
231+
case hive.v13 =>
232+
client.call[metadata.Table, Set[metadata.Partition]]("getAllPartitionsOf", qlTable)
230233
}
231234
qlPartitions.map(_.getTPartition).map { p =>
232235
HivePartition(
@@ -239,14 +242,14 @@ class ClientWrapper(
239242
}.toSeq
240243
}
241244

242-
def listTables(dbName: String): Seq[String] = withClassLoader {
245+
override def listTables(dbName: String): Seq[String] = withHiveState {
243246
client.getAllTables
244247
}
245248

246249
/**
247250
* Runs the specified SQL query using Hive.
248251
*/
249-
def runSqlHive(sql: String): Seq[String] = {
252+
override def runSqlHive(sql: String): Seq[String] = {
250253
val maxResults = 100000
251254
val results = runHive(sql, maxResults)
252255
// It is very confusing when you only get back some of the results...
@@ -258,7 +261,7 @@ class ClientWrapper(
258261
* Execute the command using Hive and return the results as a sequence. Each element
259262
* in the sequence is one row.
260263
*/
261-
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withClassLoader {
264+
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
262265
logDebug(s"Running hiveql '$cmd'")
263266
if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") }
264267
try {
@@ -331,7 +334,7 @@ class ClientWrapper(
331334
replace: Boolean,
332335
holdDDLTime: Boolean,
333336
inheritTableSpecs: Boolean,
334-
isSkewedStoreAsSubdir: Boolean): Unit = withClassLoader {
337+
isSkewedStoreAsSubdir: Boolean): Unit = withHiveState {
335338

336339
client.loadPartition(
337340
new Path(loadPath), // TODO: Use URI
@@ -347,7 +350,7 @@ class ClientWrapper(
347350
loadPath: String, // TODO URI
348351
tableName: String,
349352
replace: Boolean,
350-
holdDDLTime: Boolean): Unit = withClassLoader {
353+
holdDDLTime: Boolean): Unit = withHiveState {
351354
client.loadTable(
352355
new Path(loadPath),
353356
tableName,
@@ -362,7 +365,7 @@ class ClientWrapper(
362365
replace: Boolean,
363366
numDP: Int,
364367
holdDDLTime: Boolean,
365-
listBucketingEnabled: Boolean): Unit = withClassLoader {
368+
listBucketingEnabled: Boolean): Unit = withHiveState {
366369
client.loadDynamicPartitions(
367370
new Path(loadPath),
368371
tableName,
@@ -373,7 +376,7 @@ class ClientWrapper(
373376
listBucketingEnabled)
374377
}
375378

376-
def reset(): Unit = withClassLoader {
379+
def reset(): Unit = withHiveState {
377380
client.getAllTables("default").foreach { t =>
378381
logDebug(s"Deleting table $t")
379382
val table = client.getTable("default", t)

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,23 @@ object IsolatedClientLoader {
3737
* Creates isolated Hive client loaders by downloading the requested version from maven.
3838
*/
3939
def forVersion(
40-
version: Int,
40+
version: String,
4141
config: Map[String, String] = Map.empty): IsolatedClientLoader = synchronized {
42-
val files = resolvedVersions.getOrElseUpdate(version, downloadVersion(version))
42+
val resolvedVersion = hiveVersion(version)
43+
val files = resolvedVersions.getOrElseUpdate(resolvedVersion, downloadVersion(resolvedVersion))
4344
new IsolatedClientLoader(hiveVersion(version), files, config)
4445
}
4546

46-
def hiveVersion(version: Int): HiveVersion = version match {
47-
case 12 => hive.v12
48-
case 13 => hive.v13
47+
def hiveVersion(version: String): HiveVersion = version match {
48+
case "12" | "0.12" | "0.12.0" => hive.v12
49+
case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13
4950
}
5051

51-
private def downloadVersion(version: Int): Seq[File] = {
52-
val v = hiveVersion(version).fullVersion
52+
private def downloadVersion(version: HiveVersion): Seq[File] = {
5353
val hiveArtifacts =
5454
(Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") ++
55-
(if (version <= 10) "hive-builtins" :: Nil else Nil))
56-
.map(a => s"org.apache.hive:$a:$v") :+
55+
(if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
56+
.map(a => s"org.apache.hive:$a:${version.fullVersion}") :+
5757
"com.google.guava:guava:14.0.1" :+
5858
"org.apache.hadoop:hadoop-client:2.4.0" :+
5959
"mysql:mysql-connector-java:5.1.12"
@@ -75,7 +75,7 @@ object IsolatedClientLoader {
7575
tempDir.listFiles()
7676
}
7777

78-
private def resolvedVersions = new scala.collection.mutable.HashMap[Int, Seq[File]]
78+
private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[File]]
7979
}
8080

8181
/**
@@ -136,7 +136,7 @@ class IsolatedClientLoader(
136136
protected val classLoader: ClassLoader = new URLClassLoader(allJars, rootClassLoader) {
137137
override def loadClass(name: String, resolve: Boolean): Class[_] = {
138138
val loaded = findLoadedClass(name)
139-
if(loaded == null) doLoadClass(name, resolve) else loaded
139+
if (loaded == null) doLoadClass(name, resolve) else loaded
140140
}
141141

142142
def doLoadClass(name: String, resolve: Boolean): Class[_] = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ package org.apache.spark.sql.hive
1919

2020
/** Support for interacting with different versions of the HiveMetastoreClient */
2121
package object client {
22-
private[client] abstract class HiveVersion(val fullVersion: String)
22+
private[client] abstract class HiveVersion(val fullVersion: String, val hasBuiltinsJar: Boolean)
2323

2424
// scalastyle:off
2525
private[client] object hive {
26-
case object v12 extends HiveVersion("0.12.0")
27-
case object v13 extends HiveVersion("0.13.1")
26+
case object v10 extends HiveVersion("0.10.0", true)
27+
case object v11 extends HiveVersion("0.11.0", false)
28+
case object v12 extends HiveVersion("0.12.0", false)
29+
case object v13 extends HiveVersion("0.13.1", false)
2830
}
2931
// scalastyle:on
32+
3033
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.scalatest.FunSuite
2525
class VersionsSuite extends FunSuite with Logging {
2626
val testType = "derby"
2727

28-
private def buildConf(version: Int) = {
28+
private def buildConf() = {
2929
lazy val warehousePath = Utils.createTempDir()
3030
lazy val metastorePath = Utils.createTempDir()
3131
metastorePath.delete()
@@ -35,8 +35,9 @@ class VersionsSuite extends FunSuite with Logging {
3535
}
3636

3737
test("success sanity check") {
38-
val badClient = IsolatedClientLoader.forVersion(13, buildConf(13)).client
39-
badClient.createDatabase("default")
38+
val badClient = IsolatedClientLoader.forVersion("13", buildConf()).client
39+
val db = new HiveDatabase("default", "")
40+
badClient.createDatabase(db)
4041
}
4142

4243
private def getNestedMessages(e: Throwable): String = {
@@ -55,24 +56,25 @@ class VersionsSuite extends FunSuite with Logging {
5556
// TODO: currently only works on mysql where we manually create the schema...
5657
ignore("failure sanity check") {
5758
val e = intercept[Throwable] {
58-
val badClient = quietly { IsolatedClientLoader.forVersion(13, buildConf(12)).client }
59+
val badClient = quietly { IsolatedClientLoader.forVersion("13", buildConf()).client }
5960
}
6061
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
6162
}
6263

63-
private val versions = Seq(12, 13)
64+
private val versions = Seq("12", "13")
6465

6566
private var client: ClientInterface = null
6667

6768
versions.foreach { version =>
6869
test(s"$version: listTables") {
6970
client = null
70-
client = IsolatedClientLoader.forVersion(version, buildConf(version)).client
71+
client = IsolatedClientLoader.forVersion(version, buildConf()).client
7172
client.listTables("default")
7273
}
7374

7475
test(s"$version: createDatabase") {
75-
client.createDatabase("default")
76+
val db = HiveDatabase("default", "")
77+
client.createDatabase(db)
7678
}
7779

7880
test(s"$version: createTable") {
@@ -85,7 +87,7 @@ class VersionsSuite extends FunSuite with Logging {
8587
properties = Map.empty,
8688
serdeProperties = Map.empty,
8789
tableType = ManagedTable,
88-
location = Some("/user/hive/src"),
90+
location = None,
8991
inputFormat =
9092
Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
9193
outputFormat =

0 commit comments

Comments
 (0)