Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e965c5f
kvstore thrift
shahidki31 Nov 3, 2019
a93ca4e
add
shahidki31 Nov 5, 2019
207de3a
address comment
shahidki31 Nov 6, 2019
fc62162
Merge branch 'master' of https://github.com/apache/spark into ThriftK…
shahidki31 Nov 6, 2019
7046ea2
remove duplicate pom
shahidki31 Nov 6, 2019
c18ba07
update
shahidki31 Nov 6, 2019
0a261d1
scalastyle
shahidki31 Nov 6, 2019
38e7424
update
shahidki31 Nov 6, 2019
c608ad0
address comment
shahidki31 Nov 6, 2019
2a03b1b
address comment
shahidki31 Nov 10, 2019
9d4f379
Merge branch 'master' of https://github.com/apache/spark into ThriftK…
shahidki31 Nov 10, 2019
a87c851
update
shahidki31 Nov 11, 2019
da2ae4c
compile fix
shahidki31 Nov 11, 2019
0b48cc7
This commit has following changes,
shahidki31 Nov 23, 2019
c3df928
Merge branch 'master' of https://github.com/apache/spark into ThriftK…
shahidki31 Nov 23, 2019
c277183
scalastyle
shahidki31 Nov 23, 2019
9287118
address comment
shahidki31 Nov 24, 2019
4ab4e92
address comment
shahidki31 Nov 25, 2019
607dded
scalastyle
shahidki31 Nov 25, 2019
24b0cb9
Merge branch 'ThriftKVStore' of https://github.com/shahidki31/spark i…
shahidki31 Nov 25, 2019
1784b4f
Merge branch 'master' of https://github.com/apache/spark into ThriftK…
shahidki31 Nov 26, 2019
ff7267c
Address comments including refactoring of update
shahidki31 Nov 26, 2019
eaea6e3
refactor test
shahidki31 Nov 26, 2019
a55927e
address comments
shahidki31 Nov 27, 2019
af65eed
add comment
shahidki31 Nov 27, 2019
db4269c
address comment
shahidki31 Nov 27, 2019
f7fcc75
scalastyle
shahidki31 Nov 27, 2019
5db0667
add display order. Minor refactor test.
shahidki31 Nov 28, 2019
2bc86c5
Merge branch 'ThriftKVStore' of https://github.com/shahidki31/spark i…
shahidki31 Nov 28, 2019
39ddc89
address comments
shahidki31 Nov 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), conf, secManager,
app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(), attempt.info.appSparkVersion)
loadPlugins().foreach(_.setupUI(ui))

val loadedUI = LoadedAppUI(ui)
// place the tab in UI based on the display order
loadPlugins().toSeq.sortBy(_.displayOrder).foreach(_.setupUI(ui))

val loadedUI = LoadedAppUI(ui)
synchronized {
activeUIs((appId, attemptId)) = loadedUI
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ private[spark] trait AppHistoryServerPlugin {
* Sets up UI of this plugin to rebuild the history UI.
*/
def setupUI(ui: SparkUI): Unit

/**
* The position of a plugin tab relative to the other plugin tabs in the history UI.
*/
def displayOrder: Int = Integer.MAX_VALUE
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ class SQLHistoryServerPlugin extends AppHistoryServerPlugin {
new SQLTab(sqlStatusStore, ui)
}
}

override def displayOrder: Int = 0
}

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2HistoryServerPlugin
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
Expand All @@ -32,12 +29,11 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.hive.thriftserver.ui._
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
Expand All @@ -47,6 +43,7 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
object HiveThriftServer2 extends Logging {
var uiTab: Option[ThriftServerTab] = None
var listener: HiveThriftServer2Listener = _
var eventManager: HiveThriftServer2EventManager = _

/**
* :: DeveloperApi ::
Expand All @@ -62,14 +59,21 @@ object HiveThriftServer2 extends Logging {

server.init(executionHive.conf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(sqlContext.sparkContext))
createListenerAndUI(server, sqlContext.sparkContext)
server
}

private def createListenerAndUI(server: HiveThriftServer2, sc: SparkContext): Unit = {
val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
eventManager = new HiveThriftServer2EventManager(sc)
listener = new HiveThriftServer2Listener(kvStore, sc.conf, Some(server))
sc.listenerBus.addToStatusQueue(listener)
uiTab = if (sc.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)),
ThriftServerTab.getSparkUI(sc)))
} else {
None
}
server
}

def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -101,13 +105,7 @@ object HiveThriftServer2 extends Logging {
server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
} else {
None
}
createListenerAndUI(server, SparkSQLEnv.sparkContext)
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
Expand All @@ -121,179 +119,10 @@ object HiveThriftServer2 extends Logging {
}
}

private[thriftserver] class SessionInfo(
val sessionId: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here and below are moved to HiveThriftServer2Listener.scala class

val startTimestamp: Long,
val ip: String,
val userName: String) {
var finishTimestamp: Long = 0L
var totalExecution: Int = 0
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp
} else {
finishTimestamp - startTimestamp
}
}
}

private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}

private[thriftserver] class ExecutionInfo(
val statement: String,
val sessionId: String,
val startTimestamp: Long,
val userName: String) {
var finishTimestamp: Long = 0L
var closeTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
def totalTime(endTime: Long): Long = {
if (endTime == 0L) {
System.currentTimeMillis - startTimestamp
} else {
endTime - startTimestamp
}
}
}


/**
* An inner sparkListener called in sc.stop to clean up the HiveThriftServer2
*/
private[thriftserver] class HiveThriftServer2Listener(
val server: HiveServer2,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a separate file (HiveThriftserver2Listener.scala)

val conf: SQLConf) extends SparkListener {

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
server.stop()
}
private val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)

def getOnlineSessionNum: Int = synchronized {
sessionList.count(_._2.finishTimestamp == 0)
}

def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
!(execInfo.state == ExecutionState.FAILED ||
execInfo.state == ExecutionState.CANCELED ||
execInfo.state == ExecutionState.CLOSED)
}

/**
* When an error or a cancellation occurs, we set the finishTimestamp of the statement.
* Therefore, when we count the number of running statements, we need to exclude errors and
* cancellations and count all statements that have not been closed so far.
*/
def getTotalRunning: Int = synchronized {
executionList.count {
case (_, v) => isExecutionActive(v)
}
}

def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq }

def getSession(sessionId: String): Option[SessionInfo] = synchronized {
sessionList.get(sessionId)
}

def getExecutionList: Seq[ExecutionInfo] = synchronized { executionList.values.toSeq }

override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
for {
props <- Option(jobStart.properties)
groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
(_, info) <- executionList if info.groupId == groupId
} {
info.jobId += jobStart.jobId.toString
info.groupId = groupId
}
}

def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
synchronized {
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
sessionList.put(sessionId, info)
trimSessionIfNecessary()
}
}

def onSessionClosed(sessionId: String): Unit = synchronized {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
trimSessionIfNecessary()
}

def onStatementStart(
id: String,
sessionId: String,
statement: String,
groupId: String,
userName: String = "UNKNOWN"): Unit = synchronized {
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
info.state = ExecutionState.STARTED
executionList.put(id, info)
trimExecutionIfNecessary()
sessionList(sessionId).totalExecution += 1
executionList(id).groupId = groupId
}

def onStatementParsed(id: String, executionPlan: String): Unit = synchronized {
executionList(id).executePlan = executionPlan
executionList(id).state = ExecutionState.COMPILED
}

def onStatementCanceled(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CANCELED
trimExecutionIfNecessary()
}

def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMsg
executionList(id).state = ExecutionState.FAILED
trimExecutionIfNecessary()
}

def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
trimExecutionIfNecessary()
}

def onOperationClosed(id: String): Unit = synchronized {
executionList(id).closeTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CLOSED
}

private def trimExecutionIfNecessary() = {
if (executionList.size > retainedStatements) {
val toRemove = math.max(retainedStatements / 10, 1)
executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
executionList.remove(s._1)
}
}
}

private def trimSessionIfNecessary() = {
if (sessionList.size > retainedSessions) {
val toRemove = math.max(retainedSessions / 10, 1)
sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
sessionList.remove(s._1)
}
}

}
}
}

private[hive] class HiveThriftServer2(sqlContext: SQLContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[hive] class SparkExecuteStatementOperation(
// RDDs will be cleaned automatically upon garbage collection.
logInfo(s"Close statement with $statementId")
cleanup(OperationState.CLOSED)
HiveThriftServer2.listener.onOperationClosed(statementId)
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
Expand Down Expand Up @@ -195,7 +195,7 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.PENDING)
statementId = UUID.randomUUID().toString
logInfo(s"Submitting query '$statement' with $statementId")
HiveThriftServer2.listener.onStatementStart(
HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
Expand Down Expand Up @@ -245,14 +245,14 @@ private[hive] class SparkExecuteStatementOperation(
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
Expand Down Expand Up @@ -284,7 +284,8 @@ private[hive] class SparkExecuteStatementOperation(
"in this session.")
case _ =>
}
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
HiveThriftServer2.eventManager.onStatementParsed(statementId,
result.queryExecution.toString())
iter = {
if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
resultList = None
Expand Down Expand Up @@ -315,12 +316,12 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error running query: " + root.toString, root)
}
Expand All @@ -329,7 +330,7 @@ private[hive] class SparkExecuteStatementOperation(
synchronized {
if (!getStatus.getState.isTerminal) {
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
sqlContext.sparkContext.clearJobGroup()
Expand All @@ -341,7 +342,7 @@ private[hive] class SparkExecuteStatementOperation(
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel query with $statementId")
cleanup(OperationState.CANCELED)
HiveThriftServer2.listener.onStatementCanceled(statementId)
HiveThriftServer2.eventManager.onStatementCanceled(statementId)
}
}
}
Expand Down
Loading