Skip to content

Commit b1a732f

Browse files
lw-linbrkyvz
authored andcommitted
[SPARK-20441][SPARK-20432][SS] Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation
## What changes were proposed in this pull request? Within the same streaming query, when one `StreamingRelation` is referred multiple times – e.g. `df.union(df)` – we should transform it only to one `StreamingExecutionRelation`, instead of two or more different `StreamingExecutionRelation`s (each of which would have a separate set of source, source logs, ...). ## How was this patch tested? Added two test cases, each of which would fail without this patch. Author: Liwei Lin <[email protected]> Closes #17735 from lw-lin/SPARK-20441. (cherry picked from commit 27f543b) Signed-off-by: Burak Yavuz <[email protected]>
1 parent b5947f5 commit b1a732f

File tree

2 files changed

+60
-8
lines changed

2 files changed

+60
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicReference
2424
import java.util.concurrent.locks.ReentrantLock
2525

26+
import scala.collection.mutable.{Map => MutableMap}
2627
import scala.collection.mutable.ArrayBuffer
2728
import scala.util.control.NonFatal
2829

@@ -148,15 +149,18 @@ class StreamExecution(
148149
"logicalPlan must be initialized in StreamExecutionThread " +
149150
s"but the current thread was ${Thread.currentThread}")
150151
var nextSourceId = 0L
152+
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
151153
val _logicalPlan = analyzedPlan.transform {
152-
case StreamingRelation(dataSource, _, output) =>
153-
// Materialize source to avoid creating it in every batch
154-
val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
155-
val source = dataSource.createSource(metadataPath)
156-
nextSourceId += 1
157-
// We still need to use the previous `output` instead of `source.schema` as attributes in
158-
// "df.logicalPlan" has already used attributes of the previous `output`.
159-
StreamingExecutionRelation(source, output)
154+
case streamingRelation@StreamingRelation(dataSource, _, output) =>
155+
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
156+
// Materialize source to avoid creating it in every batch
157+
val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
158+
val source = dataSource.createSource(metadataPath)
159+
nextSourceId += 1
160+
// We still need to use the previous `output` instead of `source.schema` as attributes in
161+
// "df.logicalPlan" has already used attributes of the previous `output`.
162+
StreamingExecutionRelation(source, output)
163+
})
160164
}
161165
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
162166
uniqueSources = sources.distinct

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,27 @@ class StreamSuite extends StreamTest {
7171
CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
7272
}
7373

74+
test("SPARK-20432: union one stream with itself") {
75+
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
76+
val unioned = df.union(df)
77+
withTempDir { outputDir =>
78+
withTempDir { checkpointDir =>
79+
val query =
80+
unioned
81+
.writeStream.format("parquet")
82+
.option("checkpointLocation", checkpointDir.getAbsolutePath)
83+
.start(outputDir.getAbsolutePath)
84+
try {
85+
query.processAllAvailable()
86+
val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
87+
checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 10L)).toArray: _*)
88+
} finally {
89+
query.stop()
90+
}
91+
}
92+
}
93+
}
94+
7495
test("union two streams") {
7596
val inputData1 = MemoryStream[Int]
7697
val inputData2 = MemoryStream[Int]
@@ -122,6 +143,33 @@ class StreamSuite extends StreamTest {
122143
assertDF(df)
123144
}
124145

146+
test("Within the same streaming query, one StreamingRelation should only be transformed to one " +
147+
"StreamingExecutionRelation") {
148+
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
149+
var query: StreamExecution = null
150+
try {
151+
query =
152+
df.union(df)
153+
.writeStream
154+
.format("memory")
155+
.queryName("memory")
156+
.start()
157+
.asInstanceOf[StreamingQueryWrapper]
158+
.streamingQuery
159+
query.awaitInitialization(streamingTimeout.toMillis)
160+
val executionRelations =
161+
query
162+
.logicalPlan
163+
.collect { case ser: StreamingExecutionRelation => ser }
164+
assert(executionRelations.size === 2)
165+
assert(executionRelations.distinct.size === 1)
166+
} finally {
167+
if (query != null) {
168+
query.stop()
169+
}
170+
}
171+
}
172+
125173
test("unsupported queries") {
126174
val streamInput = MemoryStream[Int]
127175
val batchInput = Seq(1, 2, 3).toDS()

0 commit comments

Comments
 (0)