diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index ef9c43ecf14f6..3a697a7d38d90 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -19,6 +19,8 @@ package org.apache.spark.io import java.io.{InputStream, OutputStream} + +import com.google.common.io.Files import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} @@ -118,6 +120,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { */ @DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { + SnappyCompressionCodec // reference companion object to trigger static block override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) @@ -126,3 +129,11 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) } + +object SnappyCompressionCodec { + // See SPARK-2881. A best-effort attempt to isolate Snappy's staging directory to an + // app-specific location. Since snappy reads this property in a static block, this will not + // work if the user has referenced Snappy classes on their own before calling into Snappy + // via Spark. + System.setProperty("org.xerial.snappy.tempdir", Files.createTempDir().getAbsolutePath) +}