Skip to content

Commit 3c0c2d0

Browse files
Jose Torrestdas
authored andcommitted
[SPARK-21765] Set isStreaming on leaf nodes for streaming plans.
## What changes were proposed in this pull request? All streaming logical plans will now have isStreaming set. This involved adding isStreaming as a case class arg in a few cases, since a node might be logically streaming depending on where it came from. ## How was this patch tested? Existing unit tests - no functional change is intended in this PR. Author: Jose Torres <[email protected]> Author: Tathagata Das <[email protected]> Closes #18973 from joseph-torres/SPARK-21765.
1 parent 41bb1dd commit 3c0c2d0

File tree

46 files changed

+180
-97
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+180
-97
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ private[kafka010] class KafkaSource(
310310
currentPartitionOffsets = Some(untilPartitionOffsets)
311311
}
312312

313-
sqlContext.internalCreateDataFrame(rdd, schema)
313+
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
314314
}
315315

316316
/** Stop this source and free any resources it has allocated. */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,14 +1175,14 @@ object DecimalAggregates extends Rule[LogicalPlan] {
11751175
*/
11761176
object ConvertToLocalRelation extends Rule[LogicalPlan] {
11771177
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1178-
case Project(projectList, LocalRelation(output, data))
1178+
case Project(projectList, LocalRelation(output, data, isStreaming))
11791179
if !projectList.exists(hasUnevaluableExpr) =>
11801180
val projection = new InterpretedProjection(projectList, output)
11811181
projection.initialize(0)
1182-
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
1182+
LocalRelation(projectList.map(_.toAttribute), data.map(projection), isStreaming)
11831183

1184-
case Limit(IntegerLiteral(limit), LocalRelation(output, data)) =>
1185-
LocalRelation(output, data.take(limit))
1184+
case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
1185+
LocalRelation(output, data.take(limit), isStreaming)
11861186
}
11871187

11881188
private def hasUnevaluableExpr(expr: Expression): Boolean = {
@@ -1207,7 +1207,7 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
12071207
*/
12081208
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
12091209
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1210-
case Deduplicate(keys, child, streaming) if !streaming =>
1210+
case Deduplicate(keys, child) if !child.isStreaming =>
12111211
val keyExprIds = keys.map(_.exprId)
12121212
val aggCols = child.output.map { attr =>
12131213
if (keyExprIds.contains(attr.exprId)) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ object LocalRelation {
4343
}
4444
}
4545

46-
case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
46+
case class LocalRelation(output: Seq[Attribute],
47+
data: Seq[InternalRow] = Nil,
48+
// Indicates whether this relation has data from a streaming source.
49+
override val isStreaming: Boolean = false)
4750
extends LeafNode with analysis.MultiInstanceRelation {
4851

4952
// A local relation must have resolved output.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ abstract class LogicalPlan
4747
*/
4848
def analyzed: Boolean = _analyzed
4949

50-
/** Returns true if this subtree contains any streaming data sources. */
50+
/** Returns true if this subtree has data from a streaming data source. */
5151
def isStreaming: Boolean = children.exists(_.isStreaming == true)
5252

5353
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,10 @@ case class Sort(
429429

430430
/** Factory for constructing new `Range` nodes. */
431431
object Range {
432-
def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): Range = {
432+
def apply(start: Long, end: Long, step: Long,
433+
numSlices: Option[Int], isStreaming: Boolean = false): Range = {
433434
val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
434-
new Range(start, end, step, numSlices, output)
435+
new Range(start, end, step, numSlices, output, isStreaming)
435436
}
436437
def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
437438
Range(start, end, step, Some(numSlices))
@@ -443,7 +444,8 @@ case class Range(
443444
end: Long,
444445
step: Long,
445446
numSlices: Option[Int],
446-
output: Seq[Attribute])
447+
output: Seq[Attribute],
448+
override val isStreaming: Boolean)
447449
extends LeafNode with MultiInstanceRelation {
448450

449451
require(step != 0, s"step ($step) cannot be 0")
@@ -784,8 +786,7 @@ case class OneRowRelation() extends LeafNode {
784786
/** A logical plan for `dropDuplicates`. */
785787
case class Deduplicate(
786788
keys: Seq[Attribute],
787-
child: LogicalPlan,
788-
streaming: Boolean) extends UnaryNode {
789+
child: LogicalPlan) extends UnaryNode {
789790

790791
override def output: Seq[Attribute] = child.output
791792
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
9393
val table = UnresolvedInlineTable(Seq("c1"),
9494
Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
9595
val withTimeZone = ResolveTimeZone(conf).apply(table)
96-
val LocalRelation(output, data) = ResolveInlineTables(conf).apply(withTimeZone)
96+
val LocalRelation(output, data, _) = ResolveInlineTables(conf).apply(withTimeZone)
9797
val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
9898
.withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
9999
assert(output.map(_.dataType) == Seq(TimestampType))

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,18 +368,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
368368
Aggregate(
369369
Seq(attributeWithWatermark),
370370
aggExprs("c"),
371-
Deduplicate(Seq(att), streamRelation, streaming = true)),
371+
Deduplicate(Seq(att), streamRelation)),
372372
outputMode = Append)
373373

374374
assertNotSupportedInStreamingPlan(
375375
"Deduplicate - Deduplicate on streaming relation after aggregation",
376-
Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation), streaming = true),
376+
Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation)),
377377
outputMode = Complete,
378378
expectedMsgs = Seq("dropDuplicates"))
379379

380380
assertSupportedInStreamingPlan(
381381
"Deduplicate - Deduplicate on batch relation inside a streaming query",
382-
Deduplicate(Seq(att), batchRelation, streaming = false),
382+
Deduplicate(Seq(att), batchRelation),
383383
outputMode = Append
384384
)
385385

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class ReplaceOperatorSuite extends PlanTest {
7979
val input = LocalRelation('a.int, 'b.int)
8080
val attrA = input.output(0)
8181
val attrB = input.output(1)
82-
val query = Deduplicate(Seq(attrA), input, streaming = false) // dropDuplicates("a")
82+
val query = Deduplicate(Seq(attrA), input) // dropDuplicates("a")
8383
val optimized = Optimize.execute(query.analyze)
8484

8585
val correctAnswer =
@@ -95,9 +95,9 @@ class ReplaceOperatorSuite extends PlanTest {
9595
}
9696

9797
test("don't replace streaming Deduplicate") {
98-
val input = LocalRelation('a.int, 'b.int)
98+
val input = LocalRelation(Seq('a.int, 'b.int), isStreaming = true)
9999
val attrA = input.output(0)
100-
val query = Deduplicate(Seq(attrA), input, streaming = true) // dropDuplicates("a")
100+
val query = Deduplicate(Seq(attrA), input) // dropDuplicates("a")
101101
val optimized = Optimize.execute(query.analyze)
102102

103103
comparePlans(optimized, query)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,8 @@ class LogicalPlanSuite extends SparkFunSuite {
7373

7474
test("isStreaming") {
7575
val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
76-
val incrementalRelation = new LocalRelation(
77-
Seq(AttributeReference("a", IntegerType, nullable = true)())) {
78-
override def isStreaming(): Boolean = true
79-
}
76+
val incrementalRelation = LocalRelation(
77+
Seq(AttributeReference("a", IntegerType, nullable = true)()), isStreaming = true)
8078

8179
case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
8280
override def output: Seq[Attribute] = left.output ++ right.output

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
410410

411411
Dataset.ofRows(
412412
sparkSession,
413-
LogicalRDD(schema.toAttributes, parsed)(sparkSession))
413+
LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession))
414414
}
415415

416416
/**
@@ -473,7 +473,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
473473

474474
Dataset.ofRows(
475475
sparkSession,
476-
LogicalRDD(schema.toAttributes, parsed)(sparkSession))
476+
LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession))
477477
}
478478

479479
/**

0 commit comments

Comments
 (0)