File tree Expand file tree Collapse file tree 1 file changed +11
-1
lines changed
core/src/main/scala/org/apache/spark/serializer Expand file tree Collapse file tree 1 file changed +11
-1
lines changed Original file line number Diff line number Diff line change @@ -24,8 +24,18 @@ import org.apache.spark.util.ByteBufferInputStream
2424
2525private [spark] class JavaSerializationStream (out : OutputStream ) extends SerializationStream {
2626 val objOut = new ObjectOutputStream (out)
27+ var counter = 0 ;
2728 // Calling reset to avoid memory leak: http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
28- def writeObject [T ](t : T ): SerializationStream = { objOut.writeObject(t); objOut.reset(); this }
29+ def writeObject [T ](t : T ): SerializationStream = {
30+ objOut.writeObject(t);
31+ if (counter >= 1000 ) {
32+ objOut.reset();
33+ counter = 0 ;
34+ } else {
35+ counter+= 1 ;
36+ }
37+ this
38+ }
2939 def flush () { objOut.flush() }
3040 def close () { objOut.close() }
3141}
You can’t perform that action at this time.
0 commit comments