From 96e5e6a9e86f38b5b2f45f4832d6b5b572e82373 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 16 Aug 2014 19:24:18 -0700 Subject: [PATCH] SPARK-2881: Avoid collisisions in Snappy staging directory. By default Snappy uses java.io.tempdir for copying the Snappy native library. If two users run Spark jobs on the same machine it can cause an exception when the second user tries to access or overwrite the snappy file created by the first user. This will fail Spark jobs out-of-the-box if they are run on a machine shared by different users. Snappy does expose a mechanism to customize the temp directory via a system property. This system property is read in a static block inside of Snappy code. I've added a "best effort" fix for this where we try to set the system property in a static block before Snappy reads it. I've tested it and it does work, but it relies on static initialization order which is brittle. I.e. if user code accesses Snappy libraries first this could not-work. An alternative work-around for users is to explicitly set org.xerial.snappy.tempdir themselves through Spark's java options. I also filed a bug upstream with Snappy-java to ask them for better behavior here: https://github.com/xerial/snappy-java/issues/84 I think this is worth merging because in many cases it will fix the issue and at worst it's a no-op. --- .../scala/org/apache/spark/io/CompressionCodec.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index ef9c43ecf14f6..3a697a7d38d90 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -19,6 +19,8 @@ package org.apache.spark.io import java.io.{InputStream, OutputStream} + +import com.google.common.io.Files import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} @@ -118,6 +120,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { */ @DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { + SnappyCompressionCodec // reference companion object to trigger static block override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) @@ -126,3 +129,11 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) } + +object SnappyCompressionCodec { + // See SPARK-2881. A best-effort attempt to isolate Snappy's staging directory to an + // app-specific location. Since snappy reads this property in a static block, this will not + // work if the user has referenced Snappy classes on their own before calling into Snappy + // via Spark. + System.setProperty("org.xerial.snappy.tempdir", Files.createTempDir().getAbsolutePath) +}