diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 7722db56ee297..0664c5ac752c1 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -154,72 +154,19 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { */ @DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { - val version = SnappyCompressionCodec.version - override def compressedOutputStream(s: OutputStream): OutputStream = { - val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt - new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize)) - } - - override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) -} - -/** - * Object guards against memory leak bug in snappy-java library: - * (https://github.com/xerial/snappy-java/issues/131). - * Before a new version of the library, we only call the method once and cache the result. - */ -private final object SnappyCompressionCodec { - private lazy val version: String = try { + try { Snappy.getNativeLibraryVersion } catch { case e: Error => throw new IllegalArgumentException(e) } -} -/** - * Wrapper over `SnappyOutputStream` which guards against write-after-close and double-close - * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version - * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107. - */ -private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream { - - private[this] var closed: Boolean = false - - override def write(b: Int): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b) - } - - override def write(b: Array[Byte]): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b) - } - - override def write(b: Array[Byte], off: Int, len: Int): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b, off, len) - } - - override def flush(): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.flush() + override def compressedOutputStream(s: OutputStream): OutputStream = { + val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt + new SnappyOutputStream(s, blockSize) } - override def close(): Unit = { - if (!closed) { - closed = true - os.close() - } - } + override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) } /** diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 0b074fbf64eda..bf85fe0b4512c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,7 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version") ) // Exclude rules for 2.4.x