Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

sparkContext.getConf.getAll.foreach {
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
case _ =>
}

/**
* :: DeveloperApi ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,35 @@ case class SetCommand(
extends LeafNode with Command with Logging {

override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
// Configures the deprecated "mapred.reduce.tasks" property.
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), Some(v)) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
} else {
context.setConf(k, v)
Array(s"$k=$v")
}

// Query the value bound to key k.
case (Some(k), _) =>
// TODO (lian) This is just a workaround to make the Simba ODBC driver work.
// Should remove this once we get the ODBC driver updated.
if (k == "-v") {
val hiveJars = Seq(
"hive-exec-0.12.0.jar",
"hive-service-0.12.0.jar",
"hive-common-0.12.0.jar",
"hive-hwi-0.12.0.jar",
"hive-0.12.0.jar").mkString(":")

Array(
"system:java.class.path=" + hiveJars,
"system:sun.java.command=shark.SharkServer2")
}
else {
Array(s"$k=${context.getConf(k, "<undefined>")}")
}

// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAllConfs.map { case (k, v) =>
s"$k=$v"
}.toSeq
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")

// Configures a single property.
case (Some(k), Some(v)) =>
context.setConf(k, v)
Seq(s"$k=$v")

// Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
// from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
// while "SET -v" returns all properties.)
case (Some("-v") | None, None) =>
context.getAllConfs.map { case (k, v) => s"$k=$v" }.toSeq

// Queries the deprecated "mapred.reduce.tasks" property.
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), None) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")

// Queries a single property.
case (Some(k), None) =>
Seq(s"$k=${context.getConf(k, "<undefined>")}")

case _ =>
throw new IllegalArgumentException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@

package org.apache.spark.sql.test

import org.apache.spark.sql.{SQLConf, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/** A SQLContext that can be used for local testing. */
object TestSQLContext
extends SQLContext(new SparkContext("local", "TestSQLContext", new SparkConf()))
extends SQLContext(
new SparkContext(
"local[2]",
"TestSQLContext",
new SparkConf().set("spark.sql.testkey", "true"))) {

/** Fewer partitions to speed up testing. */
override private[spark] def numShufflePartitions: Int =
getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
}
11 changes: 10 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@

package org.apache.spark.sql

import org.scalatest.FunSuiteLike

import org.apache.spark.sql.test._

/* Implicits */
import TestSQLContext._

class SQLConfSuite extends QueryTest {
class SQLConfSuite extends QueryTest with FunSuiteLike {

val testKey = "test.key.0"
val testVal = "test.val.0"

test("propagate from spark conf") {
// We create a new context here to avoid order dependence with other tests that might call
// clear().
val newContext = new SQLContext(TestSQLContext.sparkContext)
assert(newContext.getConf("spark.sql.testkey", "false") == "true")
}

test("programmatic ways of basic setting and getting") {
clear()
assert(getAllConfs.size === 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ case class AllDataTypes(
doubleField: Double,
shortField: Short,
byteField: Byte,
booleanField: Boolean,
binaryField: Array[Byte])
booleanField: Boolean)

case class AllDataTypesWithNonPrimitiveType(
stringField: String,
Expand All @@ -75,13 +74,14 @@ case class AllDataTypesWithNonPrimitiveType(
shortField: Short,
byteField: Byte,
booleanField: Boolean,
binaryField: Array[Byte],
array: Seq[Int],
arrayContainsNull: Seq[Option[Int]],
map: Map[Int, Long],
mapValueContainsNull: Map[Int, Option[Long]],
data: Data)

case class BinaryData(binaryData: Array[Byte])

class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
TestData // Load test data tables.

Expand Down Expand Up @@ -117,26 +117,26 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
test("Read/Write All Types") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
TestSQLContext.sparkContext.parallelize(range)
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
(0 to x).map(_.toByte).toArray))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
i =>
assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
assert(result(i).getInt(1) === i)
assert(result(i).getLong(2) === i.toLong)
assert(result(i).getFloat(3) === i.toFloat)
assert(result(i).getDouble(4) === i.toDouble)
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
}
val data = sparkContext.parallelize(range)
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0))

data.saveAsParquetFile(tempDir)

checkAnswer(
parquetFile(tempDir),
data.toSchemaRDD.collect().toSeq)
}

test("Treat binary as string") {
test("read/write binary data") {
// Since equality for Array[Byte] is broken we test this separately.
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil).saveAsParquetFile(tempDir)
parquetFile(tempDir)
.map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8"))
.collect().toSeq == Seq("test")
}

ignore("Treat binary as string") {
val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString

// Create the test file.
Expand All @@ -151,37 +151,16 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
StructField("c2", BinaryType, false) :: Nil)
val schemaRDD1 = applySchema(rowRDD, schema)
schemaRDD1.saveAsParquetFile(path)
val resultWithBinary = parquetFile(path).collect
range.foreach {
i =>
assert(resultWithBinary(i).getInt(0) === i)
assert(resultWithBinary(i)(1) === s"val_$i".getBytes)
}

TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
// This ParquetRelation always use Parquet types to derive output.
val parquetRelation = new ParquetRelation(
path.toString,
Some(TestSQLContext.sparkContext.hadoopConfiguration),
TestSQLContext) {
override val output =
ParquetTypesConverter.convertToAttributes(
ParquetTypesConverter.readMetaData(new Path(path), conf).getFileMetaData.getSchema,
TestSQLContext.isParquetBinaryAsString)
}
val schemaRDD = new SchemaRDD(TestSQLContext, parquetRelation)
val resultWithString = schemaRDD.collect
range.foreach {
i =>
assert(resultWithString(i).getInt(0) === i)
assert(resultWithString(i)(1) === s"val_$i")
}
checkAnswer(
parquetFile(path).select('c1, 'c2.cast(StringType)),
schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)

schemaRDD.registerTempTable("tmp")
setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
parquetFile(path).printSchema()
checkAnswer(
sql("SELECT c1, c2 FROM tmp WHERE c2 = 'val_5' OR c2 = 'val_7'"),
(5, "val_5") ::
(7, "val_7") :: Nil)
parquetFile(path),
schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)


// Set it back.
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
Expand Down Expand Up @@ -284,34 +263,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
test("Read/Write All Types with non-primitive type") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
TestSQLContext.sparkContext.parallelize(range)
val data = sparkContext.parallelize(range)
.map(x => AllDataTypesWithNonPrimitiveType(
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
(0 to x).map(_.toByte).toArray,
(0 until x),
(0 until x).map(Option(_).filter(_ % 3 == 0)),
(0 until x).map(i => i -> i.toLong).toMap,
(0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
Data((0 until x), Nested(x, s"$x"))))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
i =>
assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
assert(result(i).getInt(1) === i)
assert(result(i).getLong(2) === i.toLong)
assert(result(i).getFloat(3) === i.toFloat)
assert(result(i).getDouble(4) === i.toDouble)
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
assert(result(i)(9) === (0 until i))
assert(result(i)(10) === (0 until i).map(i => if (i % 3 == 0) i else null))
assert(result(i)(11) === (0 until i).map(i => i -> i.toLong).toMap)
assert(result(i)(12) === (0 until i).map(i => i -> i.toLong).toMap + (i -> null))
assert(result(i)(13) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
}
data.saveAsParquetFile(tempDir)

checkAnswer(
parquetFile(tempDir),
data.toSchemaRDD.collect().toSeq)
}

test("self-join parquet files") {
Expand Down Expand Up @@ -408,23 +372,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
}

test("Saving case class RDD table to file and reading it back in") {
val file = getTempFilePath("parquet")
val path = file.toString
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
.map(i => TestRDDEntry(i, s"val_$i"))
rdd.saveAsParquetFile(path)
val readFile = parquetFile(path)
readFile.registerTempTable("tmpx")
val rdd_copy = sql("SELECT * FROM tmpx").collect()
val rdd_orig = rdd.collect()
for(i <- 0 to 99) {
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
}
Utils.deleteRecursively(file)
}

test("Read a parquet file instead of a directory") {
val file = getTempFilePath("parquet")
val path = file.toString
Expand Down Expand Up @@ -457,32 +404,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
val rdd_copy1 = sql("SELECT * FROM dest").collect()
assert(rdd_copy1.size === 100)
assert(rdd_copy1(0).apply(0) === 1)
assert(rdd_copy1(0).apply(1) === "val_1")
// TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
// executed twice otherwise?!

sql("INSERT INTO dest SELECT * FROM source")
val rdd_copy2 = sql("SELECT * FROM dest").collect()
val rdd_copy2 = sql("SELECT * FROM dest").collect().sortBy(_.getInt(0))
assert(rdd_copy2.size === 200)
assert(rdd_copy2(0).apply(0) === 1)
assert(rdd_copy2(0).apply(1) === "val_1")
assert(rdd_copy2(99).apply(0) === 100)
assert(rdd_copy2(99).apply(1) === "val_100")
assert(rdd_copy2(100).apply(0) === 1)
assert(rdd_copy2(100).apply(1) === "val_1")
Utils.deleteRecursively(dirname)
}

test("Insert (appending) to same table via Scala API") {
// TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
// executed twice otherwise?!
sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect()
assert(double_rdd != null)
assert(double_rdd.size === 30)
for(i <- (0 to 14)) {
assert(double_rdd(i) === double_rdd(i+15), s"error: lines $i and ${i+15} to not match")
}

// let's restore the original test data
Utils.deleteRecursively(ParquetTestData.testDir)
ParquetTestData.writeFile()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConversions._

import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}

Expand All @@ -38,24 +35,12 @@ private[hive] object HiveThriftServer2 extends Logging {

def main(args: Array[String]) {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")

if (!optionsProcessor.process(args)) {
System.exit(-1)
}

val ss = new SessionState(new HiveConf(classOf[SessionState]))

// Set all properties specified via command line.
val hiveConf: HiveConf = ss.getConf
hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
logDebug(s"HiveConf var: $k=$v")
}

SessionState.start(ss)

logInfo("Starting SparkContext")
SparkSQLEnv.init()
SessionState.start(ss)

Runtime.getRuntime.addShutdownHook(
new Thread() {
Expand All @@ -67,7 +52,7 @@ private[hive] object HiveThriftServer2 extends Logging {

try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
server.init(hiveConf)
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
} catch {
Expand Down
Loading