@@ -20,37 +20,53 @@ package org.apache.spark.sql.hive.thriftserver
2020import java .sql .{Date , Timestamp }
2121import java .util .{Map => JMap , UUID }
2222
23+ import java .security .PrivilegedExceptionAction
24+ import java .util .concurrent .Executors
25+ import java .util .concurrent .Future
26+ import java .util .concurrent .RejectedExecutionException
27+ import java .util .{ArrayList => JArrayList , List => JList , Map => JMap , UUID }
28+
29+ import scala .collection .JavaConversions ._
30+ import scala .collection .mutable .{ArrayBuffer , Map => SMap }
31+
32+ import org .apache .commons .logging .Log
33+ import org .apache .hadoop .hive .conf .HiveConf
34+ import org .apache .hadoop .hive .conf .HiveConf .ConfVars
2335import org .apache .hadoop .hive .metastore .api .FieldSchema
36+ import org .apache .hadoop .hive .ql .session .SessionState
37+ import org .apache .hadoop .hive .shims .ShimLoader
38+ import org .apache .hadoop .security .UserGroupInformation
2439import org .apache .hive .service .cli ._
2540import org .apache .hive .service .cli .operation .ExecuteStatementOperation
2641import org .apache .hive .service .cli .session .HiveSession
42+ import org .apache .hive .service .cli .thrift .TProtocolVersion
2743
2844import org .apache .spark .Logging
2945import org .apache .spark .sql .execution .SetCommand
3046import org .apache .spark .sql .hive .{HiveContext , HiveMetastoreTypes }
3147import org .apache .spark .sql .types ._
3248import org .apache .spark .sql .{DataFrame , Row => SparkRow , SQLConf }
3349
34- import scala .collection .JavaConversions ._
35- import scala .collection .mutable .{ArrayBuffer , Map => SMap }
3650
3751private [hive] class SparkExecuteStatementOperation (
3852 parentSession : HiveSession ,
3953 statement : String ,
4054 confOverlay : JMap [String , String ],
4155 runInBackground : Boolean = true )
4256 (hiveContext : HiveContext , sessionToActivePool : SMap [SessionHandle , String ])
43- // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
44- extends ExecuteStatementOperation (parentSession, statement, confOverlay, false )
57+ extends ExecuteStatementOperation (parentSession, statement, confOverlay, runInBackground)
4558 with Logging {
4659
4760 private var result : DataFrame = _
4861 private var iter : Iterator [SparkRow ] = _
4962 private var dataTypes : Array [DataType ] = _
63+ private var statementId : String = _
5064
5165 def close (): Unit = {
5266 // RDDs will be cleaned automatically upon garbage collection.
53- logDebug(" CLOSING" )
67+ hiveContext.sparkContext.clearJobGroup()
68+ logDebug(s " CLOSING $statementId" )
69+ cleanup(OperationState .CLOSED )
5470 }
5571
5672 def addNonNullColumnValue (from : SparkRow , to : ArrayBuffer [Any ], ordinal : Int ) {
@@ -114,20 +130,76 @@ private[hive] class SparkExecuteStatementOperation(
114130 }
115131
116132 def getResultSetSchema : TableSchema = {
117- logInfo(s " Result Schema: ${result.queryExecution.analyzed.output}" )
118- if (result.queryExecution.analyzed.output.size == 0 ) {
133+ if (result == null || result.queryExecution.analyzed.output.size == 0 ) {
119134 new TableSchema (new FieldSchema (" Result" , " string" , " " ) :: Nil )
120135 } else {
136+ logInfo(s " Result Schema: ${result.queryExecution.analyzed.output}" )
121137 val schema = result.queryExecution.analyzed.output.map { attr =>
122138 new FieldSchema (attr.name, HiveMetastoreTypes .toMetastoreType(attr.dataType), " " )
123139 }
124140 new TableSchema (schema)
125141 }
126142 }
127143
128- def run (): Unit = {
129- val statementId = UUID .randomUUID().toString
130- logInfo(s " Running query ' $statement' " )
144+ override def run (): Unit = {
145+ setState(OperationState .PENDING )
146+ setHasResultSet(true ) // avoid no resultset for async run
147+
148+ if (! runInBackground) {
149+ runInternal()
150+ } else {
151+ val parentSessionState = SessionState .get()
152+ val hiveConf = new HiveConf (getParentSession().getHiveConf())
153+ val sparkServiceUGI = ShimLoader .getHadoopShims.getUGIForConf(hiveConf)
154+
155+ // Runnable impl to call runInternal asynchronously,
156+ // from a different thread
157+ val backgroundOperation = new Runnable () {
158+
159+ override def run (): Unit = {
160+ val doAsAction = new PrivilegedExceptionAction [Object ]() {
161+ override def run (): Object = {
162+
163+ // User information is part of the metastore client member in Hive
164+ SessionState .setCurrentSessionState(parentSessionState)
165+ try {
166+ runInternal()
167+ } catch {
168+ case e : HiveSQLException =>
169+ setOperationException(e)
170+ log.error(" Error running hive query: " , e)
171+ }
172+ return null
173+ }
174+ }
175+
176+ try {
177+ ShimLoader .getHadoopShims().doAs(sparkServiceUGI, doAsAction)
178+ } catch {
179+ case e : Exception =>
180+ setOperationException(new HiveSQLException (e))
181+ logError(" Error running hive query as user : " +
182+ sparkServiceUGI.getShortUserName(), e)
183+ }
184+ }
185+ }
186+ try {
187+ // This submit blocks if no background threads are available to run this operation
188+ val backgroundHandle =
189+ getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
190+ setBackgroundHandle(backgroundHandle)
191+ } catch {
192+ case rejected : RejectedExecutionException =>
193+ setState(OperationState .ERROR );
194+ throw new HiveSQLException (" The background threadpool cannot accept" +
195+ " new task for execution, please retry the operation" , rejected)
196+ }
197+ }
198+ }
199+
200+ private def runInternal (): Unit = {
201+ statementId = UUID .randomUUID().toString
202+ logInfo(s " Running query ' $statement' with $statementId" )
131203 setState(OperationState .RUNNING )
132204 HiveThriftServer2 .listener.onStatementStart(
133205 statementId,
@@ -159,18 +231,43 @@ private[hive] class SparkExecuteStatementOperation(
159231 }
160232 }
161233 dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
162- setHasResultSet(true )
163234 } catch {
235+ case e : HiveSQLException =>
236+ if (getStatus().getState() == OperationState .CANCELED ) {
237+ return
238+ } else {
239+ setState(OperationState .ERROR );
240+ throw e
241+ }
164242 // Actually do need to catch Throwable as some failures don't inherit from Exception and
165243 // HiveServer will silently swallow them.
166244 case e : Throwable =>
245+ val currentState = getStatus().getState()
246+ logError(s " Error executing query, currentState $currentState, : " , e)
167247 setState(OperationState .ERROR )
168248 HiveThriftServer2 .listener.onStatementError(
169249 statementId, e.getMessage, e.getStackTraceString)
170- logError(" Error executing query:" , e)
171250 throw new HiveSQLException (e.toString)
172251 }
173252 setState(OperationState .FINISHED )
174253 HiveThriftServer2 .listener.onStatementFinish(statementId)
175254 }
255+
256+ override def cancel (): Unit = {
257+ logInfo(s " Cancel ' $statement' with $statementId" )
258+ if (statementId != null ) {
259+ hiveContext.sparkContext.cancelJobGroup(statementId)
260+ }
261+ cleanup(OperationState .CANCELED )
262+ }
263+
264+ private def cleanup (state : OperationState ) {
265+ setState(state)
266+ if (runInBackground) {
267+ val backgroundHandle = getBackgroundHandle()
268+ if (backgroundHandle != null ) {
269+ backgroundHandle.cancel(true )
270+ }
271+ }
272+ }
176273}
0 commit comments