Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.io

import java.io.{InputStream, OutputStream}


import com.google.common.io.Files
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
Expand Down Expand Up @@ -118,6 +120,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
*/
@DeveloperApi
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
SnappyCompressionCodec // reference companion object to trigger static block

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
Expand All @@ -126,3 +129,11 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

object SnappyCompressionCodec {
// See SPARK-2881. A best-effort attempt to isolate Snappy's staging directory to an
// app-specific location. Since snappy reads this property in a static block, this will not
// work if the user has referenced Snappy classes on their own before calling into Snappy
// via Spark.
System.setProperty("org.xerial.snappy.tempdir", Files.createTempDir().getAbsolutePath)
}