Skip to content

Commit 1e271fa

Browse files
committed
[SPARK-6907][SQL] Isolated client for HiveMetastore
1 parent ecc6eb5 commit 1e271fa

File tree

8 files changed

+1078
-7
lines changed

8 files changed

+1078
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ object SparkSubmit {
701701
}
702702

703703
/** Provides utility functions to be used inside SparkSubmit. */
704-
private[deploy] object SparkSubmitUtils {
704+
private[spark] object SparkSubmitUtils {
705705

706706
// Exposed for testing
707707
var printStream = SparkSubmit.printStream

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,31 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20-
import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
20+
import java.io._
2121

2222
import org.apache.spark.util.Utils
2323

2424
package object util {
2525

26+
/** Silences output to stderr or stdout for the duration of f */
27+
def quietly[A](f: => A): A = {
28+
val origErr = System.err
29+
val origOut = System.out
30+
try {
31+
System.setErr(new PrintStream(new OutputStream {
32+
def write(b: Int) = {}
33+
}))
34+
System.setOut(new PrintStream(new OutputStream {
35+
def write(b: Int) = {}
36+
}))
37+
38+
f
39+
} finally {
40+
System.setErr(origErr)
41+
System.setOut(origOut)
42+
}
43+
}
44+
2645
def fileToString(file: File, encoding: String = "UTF-8"): String = {
2746
val inStream = new FileInputStream(file)
2847
val outStream = new ByteArrayOutputStream
@@ -42,10 +61,9 @@ package object util {
4261
new String(outStream.toByteArray, encoding)
4362
}
4463

45-
def resourceToString(
46-
resource:String,
47-
encoding: String = "UTF-8",
48-
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
64+
def resourceToBytes(
65+
resource: String,
66+
classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
4967
val inStream = classLoader.getResourceAsStream(resource)
5068
val outStream = new ByteArrayOutputStream
5169
try {
@@ -61,7 +79,14 @@ package object util {
6179
finally {
6280
inStream.close()
6381
}
64-
new String(outStream.toByteArray, encoding)
82+
outStream.toByteArray
83+
}
84+
85+
def resourceToString(
86+
resource:String,
87+
encoding: String = "UTF-8",
88+
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
89+
new String(resourceToBytes(resource, classLoader), encoding)
6590
}
6691

6792
def stringToFile(file: File, str: String): File = {
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.client
19+
20+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
21+
22+
case class HiveDatabase(
23+
name: String,
24+
location: String)
25+
26+
abstract class TableType { val name: String }
27+
case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
28+
case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
29+
case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
30+
case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
31+
32+
case class HiveStorageDescriptor(
33+
location: String,
34+
inputFormat: String,
35+
outputFormat: String,
36+
serde: String)
37+
38+
case class HivePartition(
39+
values: Seq[String],
40+
storage: HiveStorageDescriptor)
41+
42+
case class HiveColumn(name: String, hiveType: String, comment: String)
43+
case class HiveTable(
44+
specifiedDatabase: Option[String],
45+
name: String,
46+
schema: Seq[HiveColumn],
47+
partitionColumns: Seq[HiveColumn],
48+
properties: Map[String, String],
49+
serdeProperties: Map[String, String],
50+
tableType: TableType,
51+
location: Option[String] = None,
52+
inputFormat: Option[String] = None,
53+
outputFormat: Option[String] = None,
54+
serde: Option[String] = None) {
55+
56+
@transient
57+
private[client] var client: ClientInterface = _
58+
59+
private[client] def withClient(ci: ClientInterface): this.type = {
60+
client = ci
61+
this
62+
}
63+
64+
def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
65+
66+
def isPartitioned: Boolean = partitionColumns.nonEmpty
67+
68+
def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
69+
70+
// Hive does not support backticks when passing names to the client.
71+
def qualifiedName: String = s"$database.$name"
72+
}
73+
74+
/**
75+
* An externally visible interface to the Hive client. This interface is shared across both the
76+
* internal and external classloaders for a given version of Hive and thus must expose only
77+
* shared classes.
78+
*/
79+
trait ClientInterface {
80+
/**
81+
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
82+
* result in one string.
83+
*/
84+
def runSqlHive(sql: String): Seq[String]
85+
86+
/** Returns the names of all tables in the given database. */
87+
def listTables(dbName: String): Seq[String]
88+
89+
/** Returns the name of the active database. */
90+
def currentDatabase: String
91+
92+
/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
93+
def getDatabase(name: String): HiveDatabase = {
94+
getDatabaseOption(name).getOrElse(sys.error(s"No such database $name"))
95+
}
96+
97+
/** Returns the metadata for a given database, or None if it doesn't exist. */
98+
def getDatabaseOption(name: String): Option[HiveDatabase]
99+
100+
/** Returns the specified table, or throws [[NoSuchTableException]]. */
101+
def getTable(dbName: String, tableName: String): HiveTable = {
102+
getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
103+
}
104+
105+
/** Returns the metadata for the specified table or None if it doens't exist. */
106+
def getTableOption(dbName: String, tableName: String): Option[HiveTable]
107+
108+
/** Creates a table with the given metadata. */
109+
def createTable(table: HiveTable): Unit
110+
111+
/** Updates the given table with new metadata. */
112+
def alterTable(table: HiveTable): Unit
113+
114+
/** Creates a new database with the given name. */
115+
def createDatabase(databaseName: String): Unit
116+
117+
/** Returns all partitions for the given table. */
118+
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
119+
120+
/** Loads a static partition into an existing table. */
121+
def loadPartition(
122+
loadPath: String,
123+
tableName: String,
124+
partSpec: java.util.LinkedHashMap[String, String],
125+
replace: Boolean,
126+
holdDDLTime: Boolean,
127+
inheritTableSpecs: Boolean,
128+
isSkewedStoreAsSubdir: Boolean): Unit
129+
130+
/** Loads data into an existing table. */
131+
def loadTable(
132+
loadPath: String, // TODO URI
133+
tableName: String,
134+
replace: Boolean,
135+
holdDDLTime: Boolean): Unit
136+
137+
/** Loads new dynamic partitions into an existing table. */
138+
def loadDynamicPartitions(
139+
loadPath: String,
140+
tableName: String,
141+
partSpec: java.util.LinkedHashMap[String, String],
142+
replace: Boolean,
143+
numDP: Int,
144+
holdDDLTime: Boolean,
145+
listBucketingEnabled: Boolean): Unit
146+
147+
/** Used for testing only. Removes all metadata from this instance of Hive. */
148+
def reset(): Unit
149+
}

0 commit comments

Comments
 (0)