Skip to content

Commit e21749d

Browse files
sarutakdongjoon-hyun
authored andcommitted
[SPARK-50748][SPARK-50889][CONNECT][4.0] Fix a race condition issue which happens when operations are interrupted
### What changes were proposed in this pull request? This PR backports #51638 to `branch-4.0`. This PR fixes an issue which happens when operations are interrupted, which is related to SPARK-50748 and SPARK-50889. Regarding SPARK-50889, this issue happens if an execution thread for an operation id cleans up the corresponding `ExecutionHolder` as the result of interruption [here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L175) before a response sender thread consumes a response [here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala#L183). In this case, the cleanup finally calls `ExecutorResponseObserver.removeAll()` and all the responses are discarded, and the response sender thread can't escape [this loop](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala#L245) because neither `gotResponse` nor `streamFinished` becomes true. The solution this PR proposes is changing the definition of `streamFinished` in `ExecuteGrpcResponseSender` so that a stream is regarded as finished in case the `ExecutionResponseObserver` is marked as completed and all the responses are discarded. `ExecutionResponseObserver.removeAll` is called when the corresponding `ExecutionHolder` is closed or cleaned up by interruption so this solution could be reasonable. ### Why are the changes needed? To fix a potential issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested manually. You can easily reproduce this issue without this change by inserting sleep to the test like as follows. ``` --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala -331,6 +331,7 class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { // cancel val operationId = result.operationId val canceledId = spark.interruptOperation(operationId) + Thread.sleep(1000) assert(canceledId == Seq(operationId)) // and check that it got canceled val e = intercept[SparkException] { ``` After this change applied, the test above doesn't hang. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51671 from sarutak/connect-race-condition. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 097a267 commit e21749d

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
232232
// 2. has a response to send
233233
def gotResponse = response.nonEmpty
234234
// 3. sent everything from the stream and the stream is finished
235-
def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _)
235+
def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) ||
236+
executionObserver.isCleaned()
236237
// 4. time deadline or size limit reached
237238
def deadlineLimitReached =
238239
sentResponsesSize > maximumResponseSize || deadlineTimeNs < System.nanoTime()

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,11 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
260260
finalProducedIndex.isDefined
261261
}
262262

263+
// Returns if this observer has already been cleaned
264+
def isCleaned(): Boolean = responseLock.synchronized {
265+
completed() && responses.isEmpty
266+
}
267+
263268
// For testing.
264269
private[connect] def undoCompletion(): Unit = responseLock.synchronized {
265270
finalProducedIndex = None

0 commit comments

Comments
 (0)