Skip to content

Commit 8b77aae

Browse files
committed
Wrap SnappyOutputStream to fix SPARK-7660
1 parent 7da33ce commit 8b77aae

File tree

2 files changed

+47
-10
lines changed

2 files changed

+47
-10
lines changed

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.io
1919

20-
import java.io.{InputStream, OutputStream}
20+
import java.io.{IOException, InputStream, OutputStream}
2121

2222
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
2323
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
154154

155155
override def compressedOutputStream(s: OutputStream): OutputStream = {
156156
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
157-
new SnappyOutputStream(s, blockSize)
157+
new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
158158
}
159159

160160
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
161161
}
162+
163+
/**
164+
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
165+
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
166+
* of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
167+
*/
168+
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {
169+
170+
private[this] var closed: Boolean = false
171+
172+
override def write(b: Int): Unit = {
173+
if (closed) {
174+
throw new IOException("Stream is closed")
175+
}
176+
os.write(b)
177+
}
178+
179+
override def write(b: Array[Byte]): Unit = {
180+
if (closed) {
181+
throw new IOException("Stream is closed")
182+
}
183+
os.write(b)
184+
}
185+
186+
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
187+
if (closed) {
188+
throw new IOException("Stream is closed")
189+
}
190+
os.write(b, off, len)
191+
}
192+
193+
override def flush(): Unit = {
194+
if (closed) {
195+
throw new IOException("Stream is closed")
196+
}
197+
os.flush()
198+
}
199+
200+
override def close(): Unit = {
201+
if (!closed) {
202+
closed = true
203+
os.close()
204+
}
205+
}
206+
}

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.mockito.MockitoAnnotations;
3636
import org.mockito.invocation.InvocationOnMock;
3737
import org.mockito.stubbing.Answer;
38-
import org.xerial.snappy.buffer.CachedBufferAllocator;
3938
import static org.hamcrest.MatcherAssert.assertThat;
4039
import static org.hamcrest.Matchers.greaterThan;
4140
import static org.hamcrest.Matchers.lessThan;
@@ -97,13 +96,6 @@ public OutputStream apply(OutputStream stream) {
9796
@After
9897
public void tearDown() {
9998
Utils.deleteRecursively(tempDir);
100-
// This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
101-
// suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
102-
// preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
103-
// needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
104-
synchronized (CachedBufferAllocator.class) {
105-
CachedBufferAllocator.queueTable.clear();
106-
}
10799
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
108100
if (leakedMemory != 0) {
109101
fail("Test leaked " + leakedMemory + " bytes of managed memory");

0 commit comments

Comments
 (0)