diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index aa78fa015dbef..2295b8dd5fe36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -87,7 +87,7 @@ class SQLAppStatusListener( } exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) - exec.stages = event.stageIds.toSet + exec.stages ++= event.stageIds.toSet update(exec) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 5ebbeb4a7cb40..7d84f45d36bee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -383,6 +383,49 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with assertJobs(statusStore.execution(executionId), failed = Seq(0)) } + test("handle one execution with multiple jobs") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + + val executionId = 0 + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + + var stageId = 0 + def twoStageJob(jobId: Int): Unit = { + val stages = Seq(stageId, stageId + 1).map { id => createStageInfo(id, 0)} + stageId += 2 + listener.onJobStart(SparkListenerJobStart( + jobId = jobId, + time = System.currentTimeMillis(), + stageInfos = stages, + createProperties(executionId))) + stages.foreach { s => + listener.onStageSubmitted(SparkListenerStageSubmitted(s)) + listener.onStageCompleted(SparkListenerStageCompleted(s)) + } + listener.onJobEnd(SparkListenerJobEnd( + jobId = jobId, + time = System.currentTimeMillis(), + JobSucceeded + )) + } + // submit two jobs with the same executionId + twoStageJob(0) + twoStageJob(1) + listener.onOtherEvent(SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis())) + + assertJobs(statusStore.execution(0), completed = 0 to 1) + assert(statusStore.execution(0).get.stages === (0 to 3).toSet) + } + test("SPARK-11126: no memory leak when running non SQL jobs") { val listener = spark.sharedState.statusStore.listener.get // At the beginning of this test case, there should be no live data in the listener.