Skip to content

Commit 64945f8

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-3971][SQL] Backport apache#2843 to branch-1.1
This PR backports apache#2843 to branch-1.1. The key difference is that this one doesn't support Hive 0.13.1 and thus always returns `0.12.0` when `spark.sql.hive.version` is queried. 6 other commits on which apache#2843 depends were also backported, they are: - apache#2887 for `SessionState` lifecycle control - apache#2675, apache#2823 & apache#3060 for major test suite refactoring and bug fixes - apache#2164, for Parquet test suites updates - apache#2493, for reading `spark.sql.*` configurations Author: Cheng Lian <[email protected]> Author: Cheng Lian <[email protected]> Author: Michael Armbrust <[email protected]> Closes apache#3113 from liancheng/get-info-for-1.1 and squashes the following commits: d354161 [Cheng Lian] Provides Spark and Hive version in HiveThriftServer2 for branch-1.1 0c2a244 [Michael Armbrust] [SPARK-3646][SQL] Copy SQL configuration from SparkConf when a SQLContext is created. 3202a36 [Michael Armbrust] [SQL] Decrease partitions when testing 7f395b7 [Cheng Lian] [SQL] Fixes race condition in CliSuite 0dd28ec [Cheng Lian] [SQL] Fixes the race condition that may cause test failure 5928b39 [Cheng Lian] [SPARK-3809][SQL] Fixes test suites in hive-thriftserver faeca62 [Cheng Lian] [SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2
1 parent b3ef06b commit 64945f8

File tree

13 files changed

+307
-292
lines changed

13 files changed

+307
-292
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
7575
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
7676
new this.QueryExecution { val logical = plan }
7777

78+
sparkContext.getConf.getAll.foreach {
79+
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
80+
case _ =>
81+
}
82+
7883
/**
7984
* :: DeveloperApi ::
8085
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -48,43 +48,35 @@ case class SetCommand(
4848
extends LeafNode with Command with Logging {
4949

5050
override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
51-
// Set value for key k.
52-
case (Some(k), Some(v)) =>
53-
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
54-
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
51+
// Configures the deprecated "mapred.reduce.tasks" property.
52+
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), Some(v)) =>
53+
logWarning(
54+
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
5555
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
56-
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
57-
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
58-
} else {
59-
context.setConf(k, v)
60-
Array(s"$k=$v")
61-
}
62-
63-
// Query the value bound to key k.
64-
case (Some(k), _) =>
65-
// TODO (lian) This is just a workaround to make the Simba ODBC driver work.
66-
// Should remove this once we get the ODBC driver updated.
67-
if (k == "-v") {
68-
val hiveJars = Seq(
69-
"hive-exec-0.12.0.jar",
70-
"hive-service-0.12.0.jar",
71-
"hive-common-0.12.0.jar",
72-
"hive-hwi-0.12.0.jar",
73-
"hive-0.12.0.jar").mkString(":")
74-
75-
Array(
76-
"system:java.class.path=" + hiveJars,
77-
"system:sun.java.command=shark.SharkServer2")
78-
}
79-
else {
80-
Array(s"$k=${context.getConf(k, "<undefined>")}")
81-
}
82-
83-
// Query all key-value pairs that are set in the SQLConf of the context.
84-
case (None, None) =>
85-
context.getAllConfs.map { case (k, v) =>
86-
s"$k=$v"
87-
}.toSeq
56+
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
57+
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
58+
59+
// Configures a single property.
60+
case (Some(k), Some(v)) =>
61+
context.setConf(k, v)
62+
Seq(s"$k=$v")
63+
64+
// Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
65+
// from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
66+
// while "SET -v" returns all properties.)
67+
case (Some("-v") | None, None) =>
68+
context.getAllConfs.map { case (k, v) => s"$k=$v" }.toSeq
69+
70+
// Queries the deprecated "mapred.reduce.tasks" property.
71+
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), None) =>
72+
logWarning(
73+
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
74+
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
75+
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")
76+
77+
// Queries a single property.
78+
case (Some(k), None) =>
79+
Seq(s"$k=${context.getConf(k, "<undefined>")}")
8880

8981
case _ =>
9082
throw new IllegalArgumentException()

sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,18 @@
1717

1818
package org.apache.spark.sql.test
1919

20+
import org.apache.spark.sql.{SQLConf, SQLContext}
2021
import org.apache.spark.{SparkConf, SparkContext}
21-
import org.apache.spark.sql.SQLContext
2222

2323
/** A SQLContext that can be used for local testing. */
2424
object TestSQLContext
25-
extends SQLContext(new SparkContext("local", "TestSQLContext", new SparkConf()))
25+
extends SQLContext(
26+
new SparkContext(
27+
"local[2]",
28+
"TestSQLContext",
29+
new SparkConf().set("spark.sql.testkey", "true"))) {
30+
31+
/** Fewer partitions to speed up testing. */
32+
override private[spark] def numShufflePartitions: Int =
33+
getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
34+
}

sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,25 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.scalatest.FunSuiteLike
21+
2022
import org.apache.spark.sql.test._
2123

2224
/* Implicits */
2325
import TestSQLContext._
2426

25-
class SQLConfSuite extends QueryTest {
27+
class SQLConfSuite extends QueryTest with FunSuiteLike {
2628

2729
val testKey = "test.key.0"
2830
val testVal = "test.val.0"
2931

32+
test("propagate from spark conf") {
33+
// We create a new context here to avoid order dependence with other tests that might call
34+
// clear().
35+
val newContext = new SQLContext(TestSQLContext.sparkContext)
36+
assert(newContext.getConf("spark.sql.testkey", "false") == "true")
37+
}
38+
3039
test("programmatic ways of basic setting and getting") {
3140
clear()
3241
assert(getAllConfs.size === 0)

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 38 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ case class AllDataTypes(
6363
doubleField: Double,
6464
shortField: Short,
6565
byteField: Byte,
66-
booleanField: Boolean,
67-
binaryField: Array[Byte])
66+
booleanField: Boolean)
6867

6968
case class AllDataTypesWithNonPrimitiveType(
7069
stringField: String,
@@ -75,13 +74,14 @@ case class AllDataTypesWithNonPrimitiveType(
7574
shortField: Short,
7675
byteField: Byte,
7776
booleanField: Boolean,
78-
binaryField: Array[Byte],
7977
array: Seq[Int],
8078
arrayContainsNull: Seq[Option[Int]],
8179
map: Map[Int, Long],
8280
mapValueContainsNull: Map[Int, Option[Long]],
8381
data: Data)
8482

83+
case class BinaryData(binaryData: Array[Byte])
84+
8585
class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
8686
TestData // Load test data tables.
8787

@@ -117,26 +117,26 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
117117
test("Read/Write All Types") {
118118
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
119119
val range = (0 to 255)
120-
TestSQLContext.sparkContext.parallelize(range)
121-
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
122-
(0 to x).map(_.toByte).toArray))
123-
.saveAsParquetFile(tempDir)
124-
val result = parquetFile(tempDir).collect()
125-
range.foreach {
126-
i =>
127-
assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
128-
assert(result(i).getInt(1) === i)
129-
assert(result(i).getLong(2) === i.toLong)
130-
assert(result(i).getFloat(3) === i.toFloat)
131-
assert(result(i).getDouble(4) === i.toDouble)
132-
assert(result(i).getShort(5) === i.toShort)
133-
assert(result(i).getByte(6) === i.toByte)
134-
assert(result(i).getBoolean(7) === (i % 2 == 0))
135-
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
136-
}
120+
val data = sparkContext.parallelize(range)
121+
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0))
122+
123+
data.saveAsParquetFile(tempDir)
124+
125+
checkAnswer(
126+
parquetFile(tempDir),
127+
data.toSchemaRDD.collect().toSeq)
137128
}
138129

139-
test("Treat binary as string") {
130+
test("read/write binary data") {
131+
// Since equality for Array[Byte] is broken we test this separately.
132+
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
133+
sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil).saveAsParquetFile(tempDir)
134+
parquetFile(tempDir)
135+
.map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8"))
136+
.collect().toSeq == Seq("test")
137+
}
138+
139+
ignore("Treat binary as string") {
140140
val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString
141141

142142
// Create the test file.
@@ -151,37 +151,16 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
151151
StructField("c2", BinaryType, false) :: Nil)
152152
val schemaRDD1 = applySchema(rowRDD, schema)
153153
schemaRDD1.saveAsParquetFile(path)
154-
val resultWithBinary = parquetFile(path).collect
155-
range.foreach {
156-
i =>
157-
assert(resultWithBinary(i).getInt(0) === i)
158-
assert(resultWithBinary(i)(1) === s"val_$i".getBytes)
159-
}
160-
161-
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
162-
// This ParquetRelation always use Parquet types to derive output.
163-
val parquetRelation = new ParquetRelation(
164-
path.toString,
165-
Some(TestSQLContext.sparkContext.hadoopConfiguration),
166-
TestSQLContext) {
167-
override val output =
168-
ParquetTypesConverter.convertToAttributes(
169-
ParquetTypesConverter.readMetaData(new Path(path), conf).getFileMetaData.getSchema,
170-
TestSQLContext.isParquetBinaryAsString)
171-
}
172-
val schemaRDD = new SchemaRDD(TestSQLContext, parquetRelation)
173-
val resultWithString = schemaRDD.collect
174-
range.foreach {
175-
i =>
176-
assert(resultWithString(i).getInt(0) === i)
177-
assert(resultWithString(i)(1) === s"val_$i")
178-
}
154+
checkAnswer(
155+
parquetFile(path).select('c1, 'c2.cast(StringType)),
156+
schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)
179157

180-
schemaRDD.registerTempTable("tmp")
158+
setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
159+
parquetFile(path).printSchema()
181160
checkAnswer(
182-
sql("SELECT c1, c2 FROM tmp WHERE c2 = 'val_5' OR c2 = 'val_7'"),
183-
(5, "val_5") ::
184-
(7, "val_7") :: Nil)
161+
parquetFile(path),
162+
schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)
163+
185164

186165
// Set it back.
187166
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
@@ -284,34 +263,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
284263
test("Read/Write All Types with non-primitive type") {
285264
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
286265
val range = (0 to 255)
287-
TestSQLContext.sparkContext.parallelize(range)
266+
val data = sparkContext.parallelize(range)
288267
.map(x => AllDataTypesWithNonPrimitiveType(
289268
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
290-
(0 to x).map(_.toByte).toArray,
291269
(0 until x),
292270
(0 until x).map(Option(_).filter(_ % 3 == 0)),
293271
(0 until x).map(i => i -> i.toLong).toMap,
294272
(0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
295273
Data((0 until x), Nested(x, s"$x"))))
296-
.saveAsParquetFile(tempDir)
297-
val result = parquetFile(tempDir).collect()
298-
range.foreach {
299-
i =>
300-
assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
301-
assert(result(i).getInt(1) === i)
302-
assert(result(i).getLong(2) === i.toLong)
303-
assert(result(i).getFloat(3) === i.toFloat)
304-
assert(result(i).getDouble(4) === i.toDouble)
305-
assert(result(i).getShort(5) === i.toShort)
306-
assert(result(i).getByte(6) === i.toByte)
307-
assert(result(i).getBoolean(7) === (i % 2 == 0))
308-
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
309-
assert(result(i)(9) === (0 until i))
310-
assert(result(i)(10) === (0 until i).map(i => if (i % 3 == 0) i else null))
311-
assert(result(i)(11) === (0 until i).map(i => i -> i.toLong).toMap)
312-
assert(result(i)(12) === (0 until i).map(i => i -> i.toLong).toMap + (i -> null))
313-
assert(result(i)(13) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
314-
}
274+
data.saveAsParquetFile(tempDir)
275+
276+
checkAnswer(
277+
parquetFile(tempDir),
278+
data.toSchemaRDD.collect().toSeq)
315279
}
316280

317281
test("self-join parquet files") {
@@ -408,23 +372,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
408372
}
409373
}
410374

411-
test("Saving case class RDD table to file and reading it back in") {
412-
val file = getTempFilePath("parquet")
413-
val path = file.toString
414-
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
415-
.map(i => TestRDDEntry(i, s"val_$i"))
416-
rdd.saveAsParquetFile(path)
417-
val readFile = parquetFile(path)
418-
readFile.registerTempTable("tmpx")
419-
val rdd_copy = sql("SELECT * FROM tmpx").collect()
420-
val rdd_orig = rdd.collect()
421-
for(i <- 0 to 99) {
422-
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
423-
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
424-
}
425-
Utils.deleteRecursively(file)
426-
}
427-
428375
test("Read a parquet file instead of a directory") {
429376
val file = getTempFilePath("parquet")
430377
val path = file.toString
@@ -457,32 +404,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
457404
sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
458405
val rdd_copy1 = sql("SELECT * FROM dest").collect()
459406
assert(rdd_copy1.size === 100)
460-
assert(rdd_copy1(0).apply(0) === 1)
461-
assert(rdd_copy1(0).apply(1) === "val_1")
462-
// TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
463-
// executed twice otherwise?!
407+
464408
sql("INSERT INTO dest SELECT * FROM source")
465-
val rdd_copy2 = sql("SELECT * FROM dest").collect()
409+
val rdd_copy2 = sql("SELECT * FROM dest").collect().sortBy(_.getInt(0))
466410
assert(rdd_copy2.size === 200)
467-
assert(rdd_copy2(0).apply(0) === 1)
468-
assert(rdd_copy2(0).apply(1) === "val_1")
469-
assert(rdd_copy2(99).apply(0) === 100)
470-
assert(rdd_copy2(99).apply(1) === "val_100")
471-
assert(rdd_copy2(100).apply(0) === 1)
472-
assert(rdd_copy2(100).apply(1) === "val_1")
473411
Utils.deleteRecursively(dirname)
474412
}
475413

476414
test("Insert (appending) to same table via Scala API") {
477-
// TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
478-
// executed twice otherwise?!
479415
sql("INSERT INTO testsource SELECT * FROM testsource")
480416
val double_rdd = sql("SELECT * FROM testsource").collect()
481417
assert(double_rdd != null)
482418
assert(double_rdd.size === 30)
483-
for(i <- (0 to 14)) {
484-
assert(double_rdd(i) === double_rdd(i+15), s"error: lines $i and ${i+15} to not match")
485-
}
419+
486420
// let's restore the original test data
487421
Utils.deleteRecursively(ParquetTestData.testDir)
488422
ParquetTestData.writeFile()

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import scala.collection.JavaConversions._
21-
2220
import org.apache.commons.logging.LogFactory
2321
import org.apache.hadoop.hive.conf.HiveConf
24-
import org.apache.hadoop.hive.ql.session.SessionState
2522
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
2623
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
2724

@@ -38,24 +35,12 @@ private[hive] object HiveThriftServer2 extends Logging {
3835

3936
def main(args: Array[String]) {
4037
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
41-
4238
if (!optionsProcessor.process(args)) {
4339
System.exit(-1)
4440
}
4541

46-
val ss = new SessionState(new HiveConf(classOf[SessionState]))
47-
48-
// Set all properties specified via command line.
49-
val hiveConf: HiveConf = ss.getConf
50-
hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
51-
logDebug(s"HiveConf var: $k=$v")
52-
}
53-
54-
SessionState.start(ss)
55-
5642
logInfo("Starting SparkContext")
5743
SparkSQLEnv.init()
58-
SessionState.start(ss)
5944

6045
Runtime.getRuntime.addShutdownHook(
6146
new Thread() {
@@ -67,7 +52,7 @@ private[hive] object HiveThriftServer2 extends Logging {
6752

6853
try {
6954
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
70-
server.init(hiveConf)
55+
server.init(SparkSQLEnv.hiveContext.hiveconf)
7156
server.start()
7257
logInfo("HiveThriftServer2 started")
7358
} catch {

0 commit comments

Comments
 (0)