Skip to content

Commit fd4bb9e

Browse files
committed
Use own ByteBufferOutputStream rather than Kryo's
1 parent 67d25ba commit fd4bb9e

File tree

4 files changed

+48
-4
lines changed

4 files changed

+48
-4
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
import java.io.IOException;
21+
import java.io.OutputStream;
22+
import java.nio.ByteBuffer;
23+
24+
class ByteBufferOutputStream extends OutputStream {
25+
26+
private final ByteBuffer byteBuffer;
27+
28+
public ByteBufferOutputStream(ByteBuffer byteBuffer) {
29+
this.byteBuffer = byteBuffer;
30+
}
31+
32+
@Override
33+
public void write(int b) throws IOException {
34+
byteBuffer.put((byte) b);
35+
}
36+
37+
@Override
38+
public void write(byte[] b) throws IOException {
39+
byteBuffer.put(b);
40+
}
41+
42+
@Override
43+
public void write(byte[] b, int off, int len) throws IOException {
44+
byteBuffer.put(b, off, len);
45+
}
46+
}

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
* spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
5454
* specialized merge procedure that avoids extra serialization/deserialization.
5555
*/
56-
public final class UnsafeShuffleExternalSorter {
56+
final class UnsafeShuffleExternalSorter {
5757

5858
private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
5959

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import org.apache.spark.util.collection.Sorter;
2323

24-
public final class UnsafeShuffleSorter {
24+
final class UnsafeShuffleSorter {
2525

2626
private final Sorter<PackedRecordPointer, long[]> sorter;
2727
private static final class SortComparator implements Comparator<PackedRecordPointer> {

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import scala.reflect.ClassTag;
2929
import scala.reflect.ClassTag$;
3030

31-
import com.esotericsoftware.kryo.io.ByteBufferOutputStream;
3231
import com.google.common.annotations.VisibleForTesting;
3332
import com.google.common.io.ByteStreams;
3433
import com.google.common.io.Closeables;
@@ -75,7 +74,6 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
7574
private UnsafeShuffleExternalSorter sorter = null;
7675
private byte[] serArray = null;
7776
private ByteBuffer serByteBuffer;
78-
// TODO: we should not depend on this class from Kryo; copy its source or find an alternative
7977
private SerializationStream serOutputStream;
8078

8179
/**

0 commit comments

Comments
 (0)