From c3f24705c8b4371baa4021493652c91fba485756 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 15 Aug 2017 18:22:58 +0900 Subject: [PATCH] Remove wrapper code for SnappyOutputStream --- .../apache/spark/io/CompressionCodec.scala | 47 +------------------ 1 file changed, 1 insertion(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 27f2e429395db..a0fbc6551980f 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -156,7 +156,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt - new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize)) + new SnappyOutputStream(s, blockSize) } override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) @@ -174,48 +174,3 @@ private final object SnappyCompressionCodec { case e: Error => throw new IllegalArgumentException(e) } } - -/** - * Wrapper over `SnappyOutputStream` which guards against write-after-close and double-close - * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version - * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107. - */ -private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream { - - private[this] var closed: Boolean = false - - override def write(b: Int): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b) - } - - override def write(b: Array[Byte]): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b) - } - - override def write(b: Array[Byte], off: Int, len: Int): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b, off, len) - } - - override def flush(): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.flush() - } - - override def close(): Unit = { - if (!closed) { - closed = true - os.close() - } - } -}