Skip to content

Commit 4eab137

Browse files
committed
kvstore thrift
1 parent 91d9901 commit 4eab137

19 files changed

+763
-264
lines changed

sql/hive-thriftserver/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@
125125
<groupId>net.sf.jpam</groupId>
126126
<artifactId>jpam</artifactId>
127127
</dependency>
128+
<dependency>
129+
<groupId>org.mockito</groupId>
130+
<artifactId>mockito-core</artifactId>
131+
<scope>test</scope>
132+
</dependency>
128133
</dependencies>
129134
<build>
130135
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2HistoryServerPlugin

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala

Lines changed: 20 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,21 @@ package org.apache.spark.sql.hive.thriftserver
2020
import java.util.Locale
2121
import java.util.concurrent.atomic.AtomicBoolean
2222

23-
import scala.collection.mutable
24-
import scala.collection.mutable.ArrayBuffer
25-
2623
import org.apache.hadoop.hive.conf.HiveConf
2724
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2825
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
2926
import org.apache.hive.service.server.HiveServer2
3027

31-
import org.apache.spark.SparkContext
28+
import org.apache.spark.{SparkContext, SparkException}
3229
import org.apache.spark.annotation.DeveloperApi
3330
import org.apache.spark.internal.Logging
3431
import org.apache.spark.internal.config.UI.UI_ENABLED
35-
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
3632
import org.apache.spark.sql.SQLContext
3733
import org.apache.spark.sql.hive.HiveUtils
3834
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
39-
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
40-
import org.apache.spark.sql.internal.SQLConf
35+
import org.apache.spark.sql.hive.thriftserver.ui.{HiveThriftServer2AppStatusStore, ThriftServerTab}
36+
import org.apache.spark.status.ElementTrackingStore
37+
import org.apache.spark.ui.SparkUI
4138
import org.apache.spark.util.{ShutdownHookManager, Utils}
4239

4340
/**
@@ -62,16 +59,25 @@ object HiveThriftServer2 extends Logging {
6259

6360
server.init(executionHive.conf)
6461
server.start()
65-
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
62+
val kvStore = SparkSQLEnv.sqlContext.sparkContext
63+
.statusStore.store.asInstanceOf[ElementTrackingStore]
64+
listener = new HiveThriftServer2Listener(kvStore, Some(server), Some(sqlContext))
6665
sqlContext.sparkContext.addSparkListener(listener)
6766
uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) {
68-
Some(new ThriftServerTab(sqlContext.sparkContext))
67+
Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)),
68+
getSparkUI(sqlContext.sparkContext)))
6969
} else {
7070
None
7171
}
7272
server
7373
}
7474

75+
def getSparkUI(sparkContext: SparkContext): SparkUI = {
76+
sparkContext.ui.getOrElse {
77+
throw new SparkException("Parent SparkUI to attach this tab to not found!")
78+
}
79+
}
80+
7581
def main(args: Array[String]): Unit = {
7682
// If the arguments contains "-h" or "--help", print out the usage and exit.
7783
if (args.contains("-h") || args.contains("--help")) {
@@ -101,10 +107,12 @@ object HiveThriftServer2 extends Logging {
101107
server.init(executionHive.conf)
102108
server.start()
103109
logInfo("HiveThriftServer2 started")
104-
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
110+
val kvStore = SparkSQLEnv.sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
111+
listener = new HiveThriftServer2Listener(kvStore, Some(server), Some(SparkSQLEnv.sqlContext))
105112
SparkSQLEnv.sparkContext.addSparkListener(listener)
106113
uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) {
107-
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
114+
Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)),
115+
getSparkUI(SparkSQLEnv.sparkContext)))
108116
} else {
109117
None
110118
}
@@ -141,161 +149,9 @@ object HiveThriftServer2 extends Logging {
141149
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
142150
type ExecutionState = Value
143151
}
144-
145-
private[thriftserver] class ExecutionInfo(
146-
val statement: String,
147-
val sessionId: String,
148-
val startTimestamp: Long,
149-
val userName: String) {
150-
var finishTimestamp: Long = 0L
151-
var closeTimestamp: Long = 0L
152-
var executePlan: String = ""
153-
var detail: String = ""
154-
var state: ExecutionState.Value = ExecutionState.STARTED
155-
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
156-
var groupId: String = ""
157-
def totalTime(endTime: Long): Long = {
158-
if (endTime == 0L) {
159-
System.currentTimeMillis - startTimestamp
160-
} else {
161-
endTime - startTimestamp
162-
}
163-
}
164-
}
165-
166-
167-
/**
168-
* An inner sparkListener called in sc.stop to clean up the HiveThriftServer2
169-
*/
170-
private[thriftserver] class HiveThriftServer2Listener(
171-
val server: HiveServer2,
172-
val conf: SQLConf) extends SparkListener {
173-
174-
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
175-
server.stop()
176-
}
177-
private val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
178-
private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
179-
private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
180-
private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
181-
182-
def getOnlineSessionNum: Int = synchronized {
183-
sessionList.count(_._2.finishTimestamp == 0)
184-
}
185-
186-
def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
187-
!(execInfo.state == ExecutionState.FAILED ||
188-
execInfo.state == ExecutionState.CANCELED ||
189-
execInfo.state == ExecutionState.CLOSED)
190-
}
191-
192-
/**
193-
* When an error or a cancellation occurs, we set the finishTimestamp of the statement.
194-
* Therefore, when we count the number of running statements, we need to exclude errors and
195-
* cancellations and count all statements that have not been closed so far.
196-
*/
197-
def getTotalRunning: Int = synchronized {
198-
executionList.count {
199-
case (_, v) => isExecutionActive(v)
200-
}
201-
}
202-
203-
def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq }
204-
205-
def getSession(sessionId: String): Option[SessionInfo] = synchronized {
206-
sessionList.get(sessionId)
207-
}
208-
209-
def getExecutionList: Seq[ExecutionInfo] = synchronized { executionList.values.toSeq }
210-
211-
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
212-
for {
213-
props <- Option(jobStart.properties)
214-
groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
215-
(_, info) <- executionList if info.groupId == groupId
216-
} {
217-
info.jobId += jobStart.jobId.toString
218-
info.groupId = groupId
219-
}
220-
}
221-
222-
def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
223-
synchronized {
224-
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
225-
sessionList.put(sessionId, info)
226-
trimSessionIfNecessary()
227-
}
228-
}
229-
230-
def onSessionClosed(sessionId: String): Unit = synchronized {
231-
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
232-
trimSessionIfNecessary()
233-
}
234-
235-
def onStatementStart(
236-
id: String,
237-
sessionId: String,
238-
statement: String,
239-
groupId: String,
240-
userName: String = "UNKNOWN"): Unit = synchronized {
241-
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
242-
info.state = ExecutionState.STARTED
243-
executionList.put(id, info)
244-
trimExecutionIfNecessary()
245-
sessionList(sessionId).totalExecution += 1
246-
executionList(id).groupId = groupId
247-
}
248-
249-
def onStatementParsed(id: String, executionPlan: String): Unit = synchronized {
250-
executionList(id).executePlan = executionPlan
251-
executionList(id).state = ExecutionState.COMPILED
252-
}
253-
254-
def onStatementCanceled(id: String): Unit = synchronized {
255-
executionList(id).finishTimestamp = System.currentTimeMillis
256-
executionList(id).state = ExecutionState.CANCELED
257-
trimExecutionIfNecessary()
258-
}
259-
260-
def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized {
261-
executionList(id).finishTimestamp = System.currentTimeMillis
262-
executionList(id).detail = errorMsg
263-
executionList(id).state = ExecutionState.FAILED
264-
trimExecutionIfNecessary()
265-
}
266-
267-
def onStatementFinish(id: String): Unit = synchronized {
268-
executionList(id).finishTimestamp = System.currentTimeMillis
269-
executionList(id).state = ExecutionState.FINISHED
270-
trimExecutionIfNecessary()
271-
}
272-
273-
def onOperationClosed(id: String): Unit = synchronized {
274-
executionList(id).closeTimestamp = System.currentTimeMillis
275-
executionList(id).state = ExecutionState.CLOSED
276-
}
277-
278-
private def trimExecutionIfNecessary() = {
279-
if (executionList.size > retainedStatements) {
280-
val toRemove = math.max(retainedStatements / 10, 1)
281-
executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
282-
executionList.remove(s._1)
283-
}
284-
}
285-
}
286-
287-
private def trimSessionIfNecessary() = {
288-
if (sessionList.size > retainedSessions) {
289-
val toRemove = math.max(retainedSessions / 10, 1)
290-
sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
291-
sessionList.remove(s._1)
292-
}
293-
}
294-
295-
}
296-
}
297152
}
298153

154+
299155
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
300156
extends HiveServer2
301157
with ReflectedCompositeService {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.thriftserver
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.scheduler.SparkListener
22+
import org.apache.spark.sql.hive.thriftserver.ui.{HiveThriftServer2AppStatusStore, ThriftServerTab}
23+
import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
24+
import org.apache.spark.ui.SparkUI
25+
26+
class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin {
27+
28+
override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = {
29+
Seq(new HiveThriftServer2Listener(store, None, None, Some(conf), false))
30+
}
31+
32+
override def setupUI(ui: SparkUI): Unit = {
33+
val store = new HiveThriftServer2AppStatusStore(ui.store.store)
34+
if (store.getExecutionCount() > 0) {
35+
new ThriftServerTab(store, ui)
36+
}
37+
}
38+
}
39+

0 commit comments

Comments
 (0)