Skip to content

Commit db4269c

Browse files
committed
address comment
1 parent af65eed commit db4269c

File tree

4 files changed

+15
-18
lines changed

4 files changed

+15
-18
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ private[thriftserver] class HiveThriftServer2EventManager(sc: SparkContext) {
3535
}
3636

3737
def onSessionClosed(sessionId: String): Unit = {
38-
postLiveListenerBus(SparkListenerSessionClosed(sessionId, System.currentTimeMillis()))
38+
postLiveListenerBus(SparkListenerThriftServerSessionClosed(sessionId,
39+
System.currentTimeMillis()))
3940
}
4041

4142
def onStatementStart(
@@ -77,7 +78,7 @@ private[thriftserver] case class SparkListenerThriftServerSessionCreated(
7778
userName: String,
7879
startTime: Long) extends SparkListenerEvent
7980

80-
private[thriftserver] case class SparkListenerSessionClosed(
81+
private[thriftserver] case class SparkListenerThriftServerSessionClosed(
8182
sessionId: String, finishTime: Long) extends SparkListenerEvent
8283

8384
private[thriftserver] case class SparkListenerThriftServerOperationStart(

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[thriftserver] class HiveThriftServer2Listener(
5050

5151
// How often to update live entities. -1 means "never update" when replaying applications,
5252
// meaning only the last write will happen. For live applications, this avoids a few
53-
// operations that we can live without when rapidly processing incoming task events.
53+
// operations that we can live without when rapidly processing incoming events.
5454
private val liveUpdatePeriodNs = if (live) sparkConf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
5555

5656
// Returns true if this listener has no live data. Exposed for tests only.
@@ -88,11 +88,6 @@ private[thriftserver] class HiveThriftServer2Listener(
8888
}
8989
}
9090

91-
/**
92-
* This method is to handle out of order events. ie. if Job event come after execution end event.
93-
* @param jobId
94-
* @param groupId
95-
*/
9691
private def updateJobDetails(jobId: String, groupId: String): Unit = {
9792
val execList = executionList.values().asScala.filter(_.groupId == groupId).toSeq
9893
if (execList.nonEmpty) {
@@ -101,7 +96,7 @@ private[thriftserver] class HiveThriftServer2Listener(
10196
updateLiveStore(exec)
10297
}
10398
} else {
104-
// It may possible that event reordering happens such a way that JobStart event come after
99+
// It may possible that event reordering happens, such a way that JobStart event come after
105100
// Execution end event (Refer SPARK-27019). To handle that situation, if occurs in
106101
// Thriftserver, following code will take care. Here will come only if JobStart event comes
107102
// after Execution End event.
@@ -119,7 +114,7 @@ private[thriftserver] class HiveThriftServer2Listener(
119114
override def onOtherEvent(event: SparkListenerEvent): Unit = {
120115
event match {
121116
case e: SparkListenerThriftServerSessionCreated => onSessionCreated(e)
122-
case e: SparkListenerSessionClosed => onSessionClosed(e)
117+
case e: SparkListenerThriftServerSessionClosed => onSessionClosed(e)
123118
case e: SparkListenerThriftServerOperationStart => onOperationStart(e)
124119
case e: SparkListenerThriftServerOperationParsed => onOperationParsed(e)
125120
case e: SparkListenerThriftServerOperationCanceled => onOperationCanceled(e)
@@ -136,7 +131,7 @@ private[thriftserver] class HiveThriftServer2Listener(
136131
updateLiveStore(session)
137132
}
138133

139-
private def onSessionClosed(e: SparkListenerSessionClosed): Unit = {
134+
private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = {
140135
val session = sessionList.get(e.sessionId)
141136
session.finishTimestamp = e.finishTime
142137
updateStoreWithTriggerEnabled(session)
@@ -299,7 +294,6 @@ private[thriftserver] class LiveExecutionData(
299294
}
300295
}
301296

302-
303297
private[thriftserver] class LiveSessionData(
304298
val sessionId: String,
305299
val startTimeStamp: Long,

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
6565
assert(statusStore.getOnlineSessionNum === 1)
6666
}
6767

68-
listener.onOtherEvent(SparkListenerSessionClosed("sessionId", System.currentTimeMillis()))
68+
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId",
69+
System.currentTimeMillis()))
6970

7071
if (!live) {
7172
// To update history store
@@ -96,15 +97,15 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
9697
"user", time))
9798

9899
time += 1
99-
listener.onOtherEvent(SparkListenerSessionClosed("sessionId1", time))
100+
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId1", time))
100101

101102
time += 1
102-
listener.onOtherEvent(SparkListenerSessionClosed("sessionId2", time))
103+
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId2", time))
103104

104105
listener.onOtherEvent(SparkListenerThriftServerSessionCreated("localhost", "sessionId3",
105106
"user", time))
106107
time += 1
107-
listener.onOtherEvent(SparkListenerSessionClosed("sessionId3", 4))
108+
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId3", 4))
108109

109110
if (!live) {
110111
kvstore.close(false)
@@ -134,7 +135,8 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
134135
System.currentTimeMillis(),
135136
Nil,
136137
createProperties))
137-
listener.onOtherEvent(SparkListenerSessionClosed("sessionId", System.currentTimeMillis()))
138+
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId",
139+
System.currentTimeMillis()))
138140
val exec = statusStore.getExecution("id")
139141
assert(exec.isDefined)
140142
assert(exec.get.jobId === Seq("0"))

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class ThriftServerPageSuite extends SparkFunSuite with BeforeAndAfter {
6262
System.currentTimeMillis()))
6363
listener.onOtherEvent(SparkListenerThriftServerOperationClosed("id",
6464
System.currentTimeMillis()))
65-
listener.onOtherEvent(SparkListenerSessionClosed("sessionid", System.currentTimeMillis()))
65+
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionid", System.currentTimeMillis()))
6666

6767
statusStore
6868
}

0 commit comments

Comments
 (0)