-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-29724][SPARK-29726][WEBUI][SQL] Support JDBC/ODBC tab for HistoryServer WebUI #26378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
e965c5f
kvstore thrift
shahidki31 a93ca4e
add
shahidki31 207de3a
address comment
shahidki31 fc62162
Merge branch 'master' of https://github.com/apache/spark into ThriftK…
shahidki31 7046ea2
remove duplicate pom
shahidki31 c18ba07
update
shahidki31 0a261d1
scalastyle
shahidki31 38e7424
update
shahidki31 c608ad0
address comment
shahidki31 2a03b1b
address comment
shahidki31 9d4f379
Merge branch 'master' of https://github.com/apache/spark into ThriftK…
shahidki31 a87c851
update
shahidki31 da2ae4c
compile fix
shahidki31 0b48cc7
This commit has following changes,
shahidki31 c3df928
Merge branch 'master' of https://github.com/apache/spark into ThriftK…
shahidki31 c277183
scalastyle
shahidki31 9287118
address comment
shahidki31 4ab4e92
address comment
shahidki31 607dded
scalastyle
shahidki31 24b0cb9
Merge branch 'ThriftKVStore' of https://github.com/shahidki31/spark i…
shahidki31 1784b4f
Merge branch 'master' of https://github.com/apache/spark into ThriftK…
shahidki31 ff7267c
Address comments including refactoring of update
shahidki31 eaea6e3
refactor test
shahidki31 a55927e
address comments
shahidki31 af65eed
add comment
shahidki31 db4269c
address comment
shahidki31 f7fcc75
scalastyle
shahidki31 5db0667
add display order. Minor refactor test.
shahidki31 2bc86c5
Merge branch 'ThriftKVStore' of https://github.com/shahidki31/spark i…
shahidki31 39ddc89
address comments
shahidki31 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 change: 1 addition & 0 deletions
1
...erver/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2HistoryServerPlugin |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.thriftserver | |
| import java.util.Locale | ||
| import java.util.concurrent.atomic.AtomicBoolean | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.hadoop.hive.conf.HiveConf | ||
| import org.apache.hadoop.hive.conf.HiveConf.ConfVars | ||
| import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} | ||
|
|
@@ -32,12 +29,11 @@ import org.apache.spark.SparkContext | |
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config.UI.UI_ENABLED | ||
| import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} | ||
| import org.apache.spark.sql.SQLContext | ||
| import org.apache.spark.sql.hive.HiveUtils | ||
| import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ | ||
| import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.hive.thriftserver.ui._ | ||
| import org.apache.spark.status.ElementTrackingStore | ||
| import org.apache.spark.util.{ShutdownHookManager, Utils} | ||
|
|
||
| /** | ||
|
|
@@ -47,6 +43,7 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} | |
| object HiveThriftServer2 extends Logging { | ||
| var uiTab: Option[ThriftServerTab] = None | ||
| var listener: HiveThriftServer2Listener = _ | ||
| var eventManager: HiveThriftServer2EventManager = _ | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
|
|
@@ -62,14 +59,21 @@ object HiveThriftServer2 extends Logging { | |
|
|
||
| server.init(executionHive.conf) | ||
| server.start() | ||
| listener = new HiveThriftServer2Listener(server, sqlContext.conf) | ||
| sqlContext.sparkContext.addSparkListener(listener) | ||
| uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) { | ||
| Some(new ThriftServerTab(sqlContext.sparkContext)) | ||
| createListenerAndUI(server, sqlContext.sparkContext) | ||
| server | ||
| } | ||
|
|
||
| private def createListenerAndUI(server: HiveThriftServer2, sc: SparkContext): Unit = { | ||
| val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore] | ||
| eventManager = new HiveThriftServer2EventManager(sc) | ||
| listener = new HiveThriftServer2Listener(kvStore, sc.conf, Some(server)) | ||
| sc.listenerBus.addToStatusQueue(listener) | ||
| uiTab = if (sc.getConf.get(UI_ENABLED)) { | ||
| Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)), | ||
| ThriftServerTab.getSparkUI(sc))) | ||
| } else { | ||
| None | ||
| } | ||
| server | ||
| } | ||
|
|
||
| def main(args: Array[String]): Unit = { | ||
|
|
@@ -101,13 +105,7 @@ object HiveThriftServer2 extends Logging { | |
| server.init(executionHive.conf) | ||
| server.start() | ||
| logInfo("HiveThriftServer2 started") | ||
| listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) | ||
| SparkSQLEnv.sparkContext.addSparkListener(listener) | ||
| uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) { | ||
| Some(new ThriftServerTab(SparkSQLEnv.sparkContext)) | ||
| } else { | ||
| None | ||
| } | ||
| createListenerAndUI(server, SparkSQLEnv.sparkContext) | ||
| // If application was killed before HiveThriftServer2 start successfully then SparkSubmit | ||
| // process can not exit, so check whether if SparkContext was stopped. | ||
| if (SparkSQLEnv.sparkContext.stopped.get()) { | ||
|
|
@@ -121,179 +119,10 @@ object HiveThriftServer2 extends Logging { | |
| } | ||
| } | ||
|
|
||
| private[thriftserver] class SessionInfo( | ||
| val sessionId: String, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code here and below are moved to |
||
| val startTimestamp: Long, | ||
| val ip: String, | ||
| val userName: String) { | ||
| var finishTimestamp: Long = 0L | ||
| var totalExecution: Int = 0 | ||
| def totalTime: Long = { | ||
| if (finishTimestamp == 0L) { | ||
| System.currentTimeMillis - startTimestamp | ||
| } else { | ||
| finishTimestamp - startTimestamp | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private[thriftserver] object ExecutionState extends Enumeration { | ||
| val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value | ||
| type ExecutionState = Value | ||
| } | ||
|
|
||
| private[thriftserver] class ExecutionInfo( | ||
| val statement: String, | ||
| val sessionId: String, | ||
| val startTimestamp: Long, | ||
| val userName: String) { | ||
| var finishTimestamp: Long = 0L | ||
| var closeTimestamp: Long = 0L | ||
| var executePlan: String = "" | ||
| var detail: String = "" | ||
| var state: ExecutionState.Value = ExecutionState.STARTED | ||
| val jobId: ArrayBuffer[String] = ArrayBuffer[String]() | ||
| var groupId: String = "" | ||
| def totalTime(endTime: Long): Long = { | ||
| if (endTime == 0L) { | ||
| System.currentTimeMillis - startTimestamp | ||
| } else { | ||
| endTime - startTimestamp | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * An inner sparkListener called in sc.stop to clean up the HiveThriftServer2 | ||
| */ | ||
| private[thriftserver] class HiveThriftServer2Listener( | ||
| val server: HiveServer2, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved to a separate file ( |
||
| val conf: SQLConf) extends SparkListener { | ||
|
|
||
| override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { | ||
| server.stop() | ||
| } | ||
| private val sessionList = new mutable.LinkedHashMap[String, SessionInfo] | ||
| private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo] | ||
| private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT) | ||
| private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT) | ||
|
|
||
| def getOnlineSessionNum: Int = synchronized { | ||
| sessionList.count(_._2.finishTimestamp == 0) | ||
| } | ||
|
|
||
| def isExecutionActive(execInfo: ExecutionInfo): Boolean = { | ||
| !(execInfo.state == ExecutionState.FAILED || | ||
| execInfo.state == ExecutionState.CANCELED || | ||
| execInfo.state == ExecutionState.CLOSED) | ||
| } | ||
|
|
||
| /** | ||
| * When an error or a cancellation occurs, we set the finishTimestamp of the statement. | ||
| * Therefore, when we count the number of running statements, we need to exclude errors and | ||
| * cancellations and count all statements that have not been closed so far. | ||
| */ | ||
| def getTotalRunning: Int = synchronized { | ||
| executionList.count { | ||
| case (_, v) => isExecutionActive(v) | ||
| } | ||
| } | ||
|
|
||
| def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq } | ||
|
|
||
| def getSession(sessionId: String): Option[SessionInfo] = synchronized { | ||
| sessionList.get(sessionId) | ||
| } | ||
|
|
||
| def getExecutionList: Seq[ExecutionInfo] = synchronized { executionList.values.toSeq } | ||
|
|
||
| override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { | ||
| for { | ||
| props <- Option(jobStart.properties) | ||
| groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) | ||
| (_, info) <- executionList if info.groupId == groupId | ||
| } { | ||
| info.jobId += jobStart.jobId.toString | ||
| info.groupId = groupId | ||
| } | ||
| } | ||
|
|
||
| def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = { | ||
| synchronized { | ||
| val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName) | ||
| sessionList.put(sessionId, info) | ||
| trimSessionIfNecessary() | ||
| } | ||
| } | ||
|
|
||
| def onSessionClosed(sessionId: String): Unit = synchronized { | ||
| sessionList(sessionId).finishTimestamp = System.currentTimeMillis | ||
| trimSessionIfNecessary() | ||
| } | ||
|
|
||
| def onStatementStart( | ||
| id: String, | ||
| sessionId: String, | ||
| statement: String, | ||
| groupId: String, | ||
| userName: String = "UNKNOWN"): Unit = synchronized { | ||
| val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName) | ||
| info.state = ExecutionState.STARTED | ||
| executionList.put(id, info) | ||
| trimExecutionIfNecessary() | ||
| sessionList(sessionId).totalExecution += 1 | ||
| executionList(id).groupId = groupId | ||
| } | ||
|
|
||
| def onStatementParsed(id: String, executionPlan: String): Unit = synchronized { | ||
| executionList(id).executePlan = executionPlan | ||
| executionList(id).state = ExecutionState.COMPILED | ||
| } | ||
|
|
||
| def onStatementCanceled(id: String): Unit = synchronized { | ||
| executionList(id).finishTimestamp = System.currentTimeMillis | ||
| executionList(id).state = ExecutionState.CANCELED | ||
| trimExecutionIfNecessary() | ||
| } | ||
|
|
||
| def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized { | ||
| executionList(id).finishTimestamp = System.currentTimeMillis | ||
| executionList(id).detail = errorMsg | ||
| executionList(id).state = ExecutionState.FAILED | ||
| trimExecutionIfNecessary() | ||
| } | ||
|
|
||
| def onStatementFinish(id: String): Unit = synchronized { | ||
| executionList(id).finishTimestamp = System.currentTimeMillis | ||
| executionList(id).state = ExecutionState.FINISHED | ||
| trimExecutionIfNecessary() | ||
| } | ||
|
|
||
| def onOperationClosed(id: String): Unit = synchronized { | ||
| executionList(id).closeTimestamp = System.currentTimeMillis | ||
| executionList(id).state = ExecutionState.CLOSED | ||
| } | ||
|
|
||
| private def trimExecutionIfNecessary() = { | ||
| if (executionList.size > retainedStatements) { | ||
| val toRemove = math.max(retainedStatements / 10, 1) | ||
| executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s => | ||
| executionList.remove(s._1) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def trimSessionIfNecessary() = { | ||
| if (sessionList.size > retainedSessions) { | ||
| val toRemove = math.max(retainedSessions / 10, 1) | ||
| sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s => | ||
| sessionList.remove(s._1) | ||
| } | ||
| } | ||
|
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| private[hive] class HiveThriftServer2(sqlContext: SQLContext) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.