From 8536e814bbd5c84594f49cd728bc03955c3a8f43 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 18 Jun 2015 05:12:50 -0700 Subject: [PATCH 1/6] disable the error message redirection for stderr --- .../hive/execution/ScriptTransformation.scala | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 611888055d6cf..a9dbc9fe820b3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -59,15 +59,13 @@ case class ScriptTransformation( child.execute().mapPartitions { iter => val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) - // redirectError(Redirect.INHERIT) would consume the error output from buffer and - // then print it to stderr (inherit the target from the current Scala process). - // If without this there would be 2 issues: + // We need to start threads connected to the process pipeline: // 1) The error msg generated by the script process would be hidden. // 2) If the error msg is too big to chock up the buffer, the input logic would be hung - builder.redirectError(Redirect.INHERIT) val proc = builder.start() val inputStream = proc.getInputStream val outputStream = proc.getOutputStream + val errorStream = proc.getErrorStream val reader = new BufferedReader(new InputStreamReader(inputStream)) val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output) @@ -176,6 +174,19 @@ case class ScriptTransformation( } }).start() + // Consume the error stream from the pipeline, otherwise it will be blocked if + // the pipeline is full. + new Thread(new Runnable() { + override def run(): Unit = { + var value = -1 + do { + value = errorStream.read() // consume the error message stream. + } while (value != -1) + + errorStream.close() + } + }).start() + iterator } } @@ -278,3 +289,4 @@ case class HiveScriptIOSchema ( } } } + From 1de771d26b9790d808244ecd486864c83f270528 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 19 Jun 2015 08:32:38 +0800 Subject: [PATCH 2/6] naming the threads in ScriptTransformation --- .../spark/sql/hive/execution/ScriptTransformation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index a9dbc9fe820b3..6401c0610e146 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -172,7 +172,7 @@ case class ScriptTransformation( } outputStream.close() } - }).start() + }, "Thread-ScriptTransformation-Feed").start() // Consume the error stream from the pipeline, otherwise it will be blocked if // the pipeline is full. @@ -185,7 +185,7 @@ case class ScriptTransformation( errorStream.close() } - }).start() + }, "Thread-ScriptTransformation-STDERR-Consumer").start() iterator } From 47e09704ffbbd0115c02e98ec47f548ee8979812 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 19 Jun 2015 23:18:21 +0800 Subject: [PATCH 3/6] Use the RedirectThread instead --- .../scala/org/apache/spark/util/Utils.scala | 31 +++++++++++++++++++ .../spark/sql/hive/client/ClientWrapper.scala | 29 ++--------------- .../hive/execution/ScriptTransformation.scala | 15 +++------ 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 19157af5b6f4d..6080522c4939c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2333,3 +2333,34 @@ private[spark] class RedirectThread( } } } + +/** + * Circular buffer, which consume all of the data write to it. + */ +private[spark] class CircularBuffer extends java.io.OutputStream { + var pos: Int = 0 + var buffer = new Array[Int](10240) + + def write(i: Int): Unit = { + buffer(pos) = i + pos = (pos + 1) % buffer.size + } + + override def toString: String = { + val (end, start) = buffer.splitAt(pos) + val input = new java.io.InputStream { + val iterator = (start ++ end).iterator + + def read(): Int = if (iterator.hasNext) iterator.next() else -1 + } + val reader = new BufferedReader(new InputStreamReader(input)) + val stringBuilder = new StringBuilder + var line = reader.readLine() + while (line != null) { + stringBuilder.append(line) + stringBuilder.append("\n") + line = reader.readLine() + } + stringBuilder.toString() + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 2f771d76793e5..256b725bb7833 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -22,6 +22,8 @@ import java.net.URI import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet} import javax.annotation.concurrent.GuardedBy +import org.apache.spark.util.CircularBuffer + import scala.collection.JavaConversions._ import scala.language.reflectiveCalls @@ -66,32 +68,7 @@ private[hive] class ClientWrapper( with Logging { // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. - private val outputBuffer = new java.io.OutputStream { - var pos: Int = 0 - var buffer = new Array[Int](10240) - def write(i: Int): Unit = { - buffer(pos) = i - pos = (pos + 1) % buffer.size - } - - override def toString: String = { - val (end, start) = buffer.splitAt(pos) - val input = new java.io.InputStream { - val iterator = (start ++ end).iterator - - def read(): Int = if (iterator.hasNext) iterator.next() else -1 - } - val reader = new BufferedReader(new InputStreamReader(input)) - val stringBuilder = new StringBuilder - var line = reader.readLine() - while(line != null) { - stringBuilder.append(line) - stringBuilder.append("\n") - line = reader.readLine() - } - stringBuilder.toString() - } - } + private val outputBuffer = new CircularBuffer() private val shim = version match { case hive.v12 => new Shim_v0_12() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 6401c0610e146..b372fff3aa015 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors} import org.apache.spark.sql.types.DataType -import org.apache.spark.util.Utils +import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils} /** * Transforms the input by forking and running the specified script. @@ -176,16 +176,9 @@ case class ScriptTransformation( // Consume the error stream from the pipeline, otherwise it will be blocked if // the pipeline is full. - new Thread(new Runnable() { - override def run(): Unit = { - var value = -1 - do { - value = errorStream.read() // consume the error message stream. - } while (value != -1) - - errorStream.close() - } - }, "Thread-ScriptTransformation-STDERR-Consumer").start() + new RedirectThread(errorStream, // input stream from the pipeline + new CircularBuffer(), // output to a circular buffer + "Thread-ScriptTransformation-STDERR-Consumer").start() iterator } From 692b19e44c8f6e10ced1d8f0e494ff83e8ad3577 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 22 Jun 2015 21:46:12 -0700 Subject: [PATCH 4/6] check the process exitValue for ScriptTransform --- .../scala/org/apache/spark/util/Utils.scala | 8 ++-- .../hive/execution/ScriptTransformation.scala | 46 +++++++++++-------- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6080522c4939c..c9c97ba847856 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2335,11 +2335,13 @@ private[spark] class RedirectThread( } /** - * Circular buffer, which consume all of the data write to it. + * An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it + * in a circular buffer. The current contents of the buffer can be accessed using + * the toString method. */ -private[spark] class CircularBuffer extends java.io.OutputStream { +private[spark] class CircularBuffer(sizeInByte: Int = 10240) extends java.io.OutputStream { var pos: Int = 0 - var buffer = new Array[Int](10240) + var buffer = new Array[Int](sizeInByte / 4) def write(i: Int): Unit = { buffer(pos) = i diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index b372fff3aa015..b967e191c5855 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -150,36 +150,44 @@ case class ScriptTransformation( val dataOutputStream = new DataOutputStream(outputStream) val outputProjection = new InterpretedProjection(input, child.output) + // TODO make the 2048 configurable? + val stderrBuffer = new CircularBuffer(2048) + // Consume the error stream from the pipeline, otherwise it will be blocked if + // the pipeline is full. + new RedirectThread(errorStream, // input stream from the pipeline + stderrBuffer, // output to a circular buffer + "Thread-ScriptTransformation-STDERR-Consumer").start() + // Put the write(output to the pipeline) into a single thread // and keep the collector as remain in the main thread. // otherwise it will causes deadlock if the data size greater than // the pipeline / buffer capacity. new Thread(new Runnable() { override def run(): Unit = { - iter - .map(outputProjection) - .foreach { row => - if (inputSerde == null) { - val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"), - ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8") - - outputStream.write(data) - } else { - val writable = inputSerde.serialize( - row.asInstanceOf[GenericInternalRow].values, inputSoi) - prepareWritable(writable).write(dataOutputStream) + Utils.tryWithSafeFinally { + iter + .map(outputProjection) + .foreach { row => + if (inputSerde == null) { + val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"), + ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8") + + outputStream.write(data) + } else { + val writable = inputSerde.serialize( + row.asInstanceOf[GenericInternalRow].values, inputSoi) + prepareWritable(writable).write(dataOutputStream) + } + } + outputStream.close() + } { + if (proc.waitFor() != 0) { + logError(stderrBuffer.toString) // log the stderr circular buffer } } - outputStream.close() } }, "Thread-ScriptTransformation-Feed").start() - // Consume the error stream from the pipeline, otherwise it will be blocked if - // the pipeline is full. - new RedirectThread(errorStream, // input stream from the pipeline - new CircularBuffer(), // output to a circular buffer - "Thread-ScriptTransformation-STDERR-Consumer").start() - iterator } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index f0aad8dbbe64d..9f7e58f890241 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -653,7 +653,7 @@ class SQLQuerySuite extends QueryTest { .queryExecution.toRdd.count()) } - ignore("test script transform for stderr") { + test("test script transform for stderr") { val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").registerTempTable("script_trans") assert(0 === From 76ff46bde356367a2ca3d5c39cdf37dcbe9f00d9 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 23 Jun 2015 22:10:26 -0700 Subject: [PATCH 5/6] update the CircularBuffer --- .../scala/org/apache/spark/util/Utils.scala | 22 ++++++++++++++----- .../org/apache/spark/util/UtilsSuite.scala | 8 +++++++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c9c97ba847856..3f01983eb7379 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2339,13 +2339,25 @@ private[spark] class RedirectThread( * in a circular buffer. The current contents of the buffer can be accessed using * the toString method. */ -private[spark] class CircularBuffer(sizeInByte: Int = 10240) extends java.io.OutputStream { +private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream { var pos: Int = 0 - var buffer = new Array[Int](sizeInByte / 4) + var buffer = new Array[Byte](sizeInBytes) - def write(i: Int): Unit = { - buffer(pos) = i - pos = (pos + 1) % buffer.size + /** + * Writes the specified byte to this output stream. The general + * contract for [[write]] is that one byte is written + * to the output stream. The byte to be written is the eight + * low-order bits of the argument `i`. The 24 + * high-order bits of `i` are ignored. + * + * Subclasses of [[OutputStream]] must provide an + * implementation for this method. + * + * @param i the byte to be written. + */ + override def write(i: Int): Unit = { + buffer(pos) = i.asInstanceOf[Byte] + pos = (pos + 1) % buffer.length } override def toString: String = { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index a61ea3918f46a..baa4c661cc21e 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(!Utils.isInDirectory(nullFile, parentDir)) assert(!Utils.isInDirectory(nullFile, childFile3)) } + + test("circular buffer") { + val buffer = new CircularBuffer(25) + val stream = new java.io.PrintStream(buffer, true, "UTF-8") + + stream.println("test circular test circular test circular test circular test circular") + assert(buffer.toString === "t circular test circular\n") + } } From bfedd77d210310022d4a83b41f3de5ae53080df4 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 26 Jun 2015 01:40:23 -0700 Subject: [PATCH 6/6] revert the write --- .../scala/org/apache/spark/util/Utils.scala | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3f01983eb7379..a7fc749a2b0c6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2341,22 +2341,10 @@ private[spark] class RedirectThread( */ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream { var pos: Int = 0 - var buffer = new Array[Byte](sizeInBytes) + var buffer = new Array[Int](sizeInBytes) - /** - * Writes the specified byte to this output stream. The general - * contract for [[write]] is that one byte is written - * to the output stream. The byte to be written is the eight - * low-order bits of the argument `i`. The 24 - * high-order bits of `i` are ignored. - * - * Subclasses of [[OutputStream]] must provide an - * implementation for this method. - * - * @param i the byte to be written. - */ - override def write(i: Int): Unit = { - buffer(pos) = i.asInstanceOf[Byte] + def write(i: Int): Unit = { + buffer(pos) = i pos = (pos + 1) % buffer.length }