Skip to content

Commit bb0ddc2

Browse files
lianchengpdeyhim
authored andcommitted
[SPARK-2094][SQL] "Exactly once" semantics for DDL and command statements
## Related JIRA issues - Main issue: - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands - Issues resolved as dependencies: - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to - Other related issue: - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue. ## PR Overview This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once. Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results. This PR defines schemas for the following DDL/commands: - EXPLAIN command - `plan`: String, the plan explanation - SET command - `key`: String, the key(s) of the propert(y/ies) being set or queried - `value`: String, the value(s) of the propert(y/ies) being queried - Other Hive native command - `result`: String, execution result returned by Hive **NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future. ## Examples ### EXPLAIN command Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL: ``` scala> loadTestTable("src") ... scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key") ... q0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == ExplainCommandPhysical [plan#11:0] Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L] Exchange (HashPartitioning [key#4:0], 200) Exchange (HashPartitioning [key#4:0], 200) Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L] HiveTableScan [key#4], (MetastoreRelation default, src, None), None scala> q0.select('plan).collect() ... [ExplainCommandPhysical [plan#24:0] Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L] Exchange (HashPartitioning [key#17:0], 200) Exchange (HashPartitioning [key#17:0], 200) Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L] HiveTableScan [key#17], (MetastoreRelation default, src, None), None] scala> ``` ### SET command In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL: ``` scala> val q1 = hql("SET") ... q1: org.apache.spark.sql.SchemaRDD = SchemaRDD[7] at RDD at SchemaRDD.scala:98 == Query Plan == <SET command: executed by Hive, and noted by SQLContext> scala> q1.registerAsTable("properties") scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println) ... == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents 14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98 == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]) ... [datanucleus.autoCreateSchema,true] [datanucleus.autoStartMechanismMode,checked] [datanucleus.cache.level2,false] [datanucleus.cache.level2.type,none] [datanucleus.connectionPoolingType,BONECP] [datanucleus.fixedDatastore,false] [datanucleus.identifierFactory,datanucleus1] [datanucleus.plugin.pluginRegistryBundleCheck,LOG] [datanucleus.rdbms.useLegacyNativeValueStrategy,true] [datanucleus.storeManagerType,rdbms] scala> ``` ### "Exactly once" semantics At last, an example of the "exactly once" semantics: ``` scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)") ... q2: org.apache.spark.sql.SchemaRDD = SchemaRDD[28] at RDD at SchemaRDD.scala:98 == Query Plan == <Native command: executed by Hive> scala> table("t1") ... res9: org.apache.spark.sql.SchemaRDD = SchemaRDD[32] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None scala> q2.collect() ... res10: Array[org.apache.spark.sql.Row] = Array([]) scala> ``` As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution. Author: Cheng Lian <[email protected]> Closes apache#1071 from liancheng/exactlyOnceCommand and squashes the following commits: d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs 1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs 5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching 48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row] cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands 74789c1 [Cheng Lian] Fixed failing test cases 0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
1 parent fa9a017 commit bb0ddc2

File tree

15 files changed

+251
-167
lines changed

15 files changed

+251
-167
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
2121
import org.apache.spark.sql.catalyst.types.StringType
2222

2323
/**
@@ -26,35 +26,37 @@ import org.apache.spark.sql.catalyst.types.StringType
2626
*/
2727
abstract class Command extends LeafNode {
2828
self: Product =>
29-
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
29+
def output: Seq[Attribute] = Seq.empty
3030
}
3131

3232
/**
3333
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
3434
* commands that are passed directly to another system.
3535
*/
36-
case class NativeCommand(cmd: String) extends Command
36+
case class NativeCommand(cmd: String) extends Command {
37+
override def output =
38+
Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
39+
}
3740

3841
/**
3942
* Commands of the form "SET (key) (= value)".
4043
*/
4144
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
4245
override def output = Seq(
43-
AttributeReference("key", StringType, nullable = false)(),
44-
AttributeReference("value", StringType, nullable = false)()
45-
)
46+
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
47+
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
4648
}
4749

4850
/**
4951
* Returned by a parser when the users only wants to see what query plan would be executed, without
5052
* actually performing the execution.
5153
*/
5254
case class ExplainCommand(plan: LogicalPlan) extends Command {
53-
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
55+
override def output =
56+
Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
5457
}
5558

5659
/**
5760
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
5861
*/
5962
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
60-

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {
161161

162162
comparePlans(optimized, correctAnswer)
163163
}
164-
164+
165165
test("joins: push down left outer join #1") {
166166
val x = testRelation.subquery('x)
167167
val y = testRelation.subquery('y)

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.types._
3333
import org.apache.spark.sql.catalyst.optimizer.Optimizer
34-
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
34+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3535
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3636

3737
import org.apache.spark.sql.columnar.InMemoryRelation
@@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
147147
*
148148
* @group userf
149149
*/
150-
def sql(sqlText: String): SchemaRDD = {
151-
val result = new SchemaRDD(this, parseSql(sqlText))
152-
// We force query optimization to happen right away instead of letting it happen lazily like
153-
// when using the query DSL. This is so DDL commands behave as expected. This is only
154-
// generates the RDD lineage for DML queries, but do not perform any execution.
155-
result.queryExecution.toRdd
156-
result
157-
}
150+
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))
158151

159152
/** Returns the specified table as a SchemaRDD */
160153
def table(tableName: String): SchemaRDD =
@@ -259,8 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
259252
protected[sql] val planner = new SparkPlanner
260253

261254
@transient
262-
protected[sql] lazy val emptyResult =
263-
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
255+
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)
264256

265257
/**
266258
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
@@ -280,35 +272,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
280272
protected abstract class QueryExecution {
281273
def logical: LogicalPlan
282274

283-
def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
284-
case SetCommand(key, value) =>
285-
// Only this case needs to be executed eagerly. The other cases will
286-
// be taken care of when the actual results are being extracted.
287-
// In the case of HiveContext, sqlConf is overridden to also pass the
288-
// pair into its HiveConf.
289-
if (key.isDefined && value.isDefined) {
290-
set(key.get, value.get)
291-
}
292-
// It doesn't matter what we return here, since this is only used
293-
// to force the evaluation to happen eagerly. To query the results,
294-
// one must use SchemaRDD operations to extract them.
295-
emptyResult
296-
case _ => executedPlan.execute()
297-
}
298-
299275
lazy val analyzed = analyzer(logical)
300276
lazy val optimizedPlan = optimizer(analyzed)
301277
// TODO: Don't just pick the first one...
302278
lazy val sparkPlan = planner(optimizedPlan).next()
303279
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
304280

305281
/** Internal version of the RDD. Avoids copies and has no schema */
306-
lazy val toRdd: RDD[Row] = {
307-
logical match {
308-
case s: SetCommand => eagerlyProcess(s)
309-
case _ => executedPlan.execute()
310-
}
311-
}
282+
lazy val toRdd: RDD[Row] = executedPlan.execute()
312283

313284
protected def stringOrError[A](f: => A): String =
314285
try f.toString catch { case e: Throwable => e.toString }
@@ -330,7 +301,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
330301
* TODO: We only support primitive types, add support for nested types.
331302
*/
332303
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
333-
val schema = rdd.first.map { case (fieldName, obj) =>
304+
val schema = rdd.first().map { case (fieldName, obj) =>
334305
val dataType = obj.getClass match {
335306
case c: Class[_] if c == classOf[java.lang.String] => StringType
336307
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ import java.util.{Map => JMap}
9797
@AlphaComponent
9898
class SchemaRDD(
9999
@transient val sqlContext: SQLContext,
100-
@transient protected[spark] val logicalPlan: LogicalPlan)
100+
@transient val baseLogicalPlan: LogicalPlan)
101101
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
102102

103103
def baseSchemaRDD = this

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ package org.apache.spark.sql
2020
import org.apache.spark.annotation.{DeveloperApi, Experimental}
2121
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2222
import org.apache.spark.sql.catalyst.plans.logical._
23+
import org.apache.spark.sql.execution.SparkLogicalPlan
2324

2425
/**
2526
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
2627
*/
2728
private[sql] trait SchemaRDDLike {
2829
@transient val sqlContext: SQLContext
29-
@transient protected[spark] val logicalPlan: LogicalPlan
30+
@transient val baseLogicalPlan: LogicalPlan
3031

3132
private[sql] def baseSchemaRDD: SchemaRDD
3233

@@ -48,7 +49,17 @@ private[sql] trait SchemaRDDLike {
4849
*/
4950
@transient
5051
@DeveloperApi
51-
lazy val queryExecution = sqlContext.executePlan(logicalPlan)
52+
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
53+
54+
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
55+
// For various commands (like DDL) and queries with side effects, we force query optimization to
56+
// happen right away to let these side effects take place eagerly.
57+
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
58+
queryExecution.toRdd
59+
SparkLogicalPlan(queryExecution.executedPlan)
60+
case _ =>
61+
baseLogicalPlan
62+
}
5263

5364
override def toString =
5465
s"""${super.toString}

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
3737
*/
3838
class JavaSchemaRDD(
3939
@transient val sqlContext: SQLContext,
40-
@transient protected[spark] val logicalPlan: LogicalPlan)
40+
@transient val baseLogicalPlan: LogicalPlan)
4141
extends JavaRDDLike[Row, JavaRDD[Row]]
4242
with SchemaRDDLike {
4343

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import org.apache.spark.sql.{SQLConf, SQLContext, execution}
20+
import org.apache.spark.sql.{SQLContext, execution}
2121
import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.planning._
2323
import org.apache.spark.sql.catalyst.plans._
@@ -157,7 +157,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
157157
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
158158
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
159159
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
160-
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
160+
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
161161
val prunePushedDownFilters =
162162
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
163163
(filters: Seq[Expression]) => {
@@ -186,7 +186,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
186186
filters,
187187
prunePushedDownFilters,
188188
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
189-
}
190189

191190
case _ => Nil
192191
}
@@ -250,12 +249,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
250249
case class CommandStrategy(context: SQLContext) extends Strategy {
251250
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
252251
case logical.SetCommand(key, value) =>
253-
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
252+
Seq(execution.SetCommand(key, value, plan.output)(context))
254253
case logical.ExplainCommand(child) =>
255254
val executedPlan = context.executePlan(child).executedPlan
256-
Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
255+
Seq(execution.ExplainCommand(executedPlan, plan.output)(context))
257256
case logical.CacheCommand(tableName, cache) =>
258-
Seq(execution.CacheCommandPhysical(tableName, cache)(context))
257+
Seq(execution.CacheCommand(tableName, cache)(context))
259258
case _ => Nil
260259
}
261260
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD
2222
import org.apache.spark.sql.{SQLContext, Row}
2323
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
2424

25+
trait Command {
26+
/**
27+
* A concrete command should override this lazy field to wrap up any side effects caused by the
28+
* command or any other computation that should be evaluated exactly once. The value of this field
29+
* can be used as the contents of the corresponding RDD generated from the physical plan of this
30+
* command.
31+
*
32+
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
33+
* so that the command can be executed eagerly right after the command query is created.
34+
*/
35+
protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
36+
}
37+
2538
/**
2639
* :: DeveloperApi ::
2740
*/
2841
@DeveloperApi
29-
case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
30-
(@transient context: SQLContext) extends LeafNode {
31-
def execute(): RDD[Row] = (key, value) match {
32-
// Set value for key k; the action itself would
33-
// have been performed in QueryExecution eagerly.
34-
case (Some(k), Some(v)) => context.emptyResult
42+
case class SetCommand(
43+
key: Option[String], value: Option[String], output: Seq[Attribute])(
44+
@transient context: SQLContext)
45+
extends LeafNode with Command {
46+
47+
override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
48+
// Set value for key k.
49+
case (Some(k), Some(v)) =>
50+
context.set(k, v)
51+
Array(k -> v)
52+
3553
// Query the value bound to key k.
36-
case (Some(k), None) =>
37-
val resultString = context.getOption(k) match {
38-
case Some(v) => s"$k=$v"
39-
case None => s"$k is undefined"
40-
}
41-
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
54+
case (Some(k), _) =>
55+
Array(k -> context.getOption(k).getOrElse("<undefined>"))
56+
4257
// Query all key-value pairs that are set in the SQLConf of the context.
4358
case (None, None) =>
44-
val pairs = context.getAll
45-
val rows = pairs.map { case (k, v) =>
46-
new GenericRow(Array[Any](s"$k=$v"))
47-
}.toSeq
48-
// Assume config parameters can fit into one split (machine) ;)
49-
context.sparkContext.parallelize(rows, 1)
50-
// The only other case is invalid semantics and is impossible.
51-
case _ => context.emptyResult
59+
context.getAll
60+
61+
case _ =>
62+
throw new IllegalArgumentException()
5263
}
64+
65+
def execute(): RDD[Row] = {
66+
val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
67+
context.sparkContext.parallelize(rows, 1)
68+
}
69+
70+
override def otherCopyArgs = context :: Nil
5371
}
5472

5573
/**
5674
* :: DeveloperApi ::
5775
*/
5876
@DeveloperApi
59-
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
60-
(@transient context: SQLContext) extends UnaryNode {
77+
case class ExplainCommand(
78+
child: SparkPlan, output: Seq[Attribute])(
79+
@transient context: SQLContext)
80+
extends UnaryNode with Command {
81+
82+
// Actually "EXPLAIN" command doesn't cause any side effect.
83+
override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n")
84+
6185
def execute(): RDD[Row] = {
62-
val planString = new GenericRow(Array[Any](child.toString))
63-
context.sparkContext.parallelize(Seq(planString))
86+
val explanation = sideEffectResult.mkString("\n")
87+
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1)
6488
}
6589

6690
override def otherCopyArgs = context :: Nil
@@ -70,19 +94,20 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
7094
* :: DeveloperApi ::
7195
*/
7296
@DeveloperApi
73-
case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
74-
extends LeafNode {
97+
case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
98+
extends LeafNode with Command {
7599

76-
lazy val commandSideEffect = {
100+
override protected[sql] lazy val sideEffectResult = {
77101
if (doCache) {
78102
context.cacheTable(tableName)
79103
} else {
80104
context.uncacheTable(tableName)
81105
}
106+
Seq.empty[Any]
82107
}
83108

84109
override def execute(): RDD[Row] = {
85-
commandSideEffect
110+
sideEffectResult
86111
context.emptyResult
87112
}
88113

0 commit comments

Comments
 (0)