From efe1102c8a7436b2fe112d3bece9f35fedea0dc8 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 12 Nov 2013 16:32:54 -0800 Subject: [PATCH 01/23] Changing CacheManager and BlockManager to pass iterators directly to the serializer when a 'DISK_ONLY' persist is called. This is in response to SPARK-942. --- .../scala/org/apache/spark/CacheManager.scala | 19 +++++++++++++++---- .../spark/serializer/JavaSerializer.scala | 3 ++- .../apache/spark/storage/BlockManager.scala | 12 +++++------- .../org/apache/spark/storage/BlockStore.scala | 2 +- .../org/apache/spark/storage/DiskStore.scala | 4 ++-- .../apache/spark/storage/MemoryStore.scala | 6 +++--- 6 files changed, 28 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 519ecde50a163..4aa8e4f7abb5e 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -71,10 +71,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } - val elements = new ArrayBuffer[Any] - elements ++= computedValues - blockManager.put(key, elements, storageLevel, tellMaster = true) - return elements.iterator.asInstanceOf[Iterator[T]] + if (storageLevel == StorageLevel.DISK_ONLY || storageLevel == StorageLevel.DISK_ONLY_2) { + blockManager.put(key, computedValues, storageLevel, tellMaster = true) + return blockManager.get(key) match { + case Some(values) => + return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) + case None => + logInfo("Failure to store %s".format(key)); + return null; + } + } else { + val elements = new ArrayBuffer[Any] + elements ++= computedValues + blockManager.put(key, elements, storageLevel, tellMaster = true) + return elements.iterator.asInstanceOf[Iterator[T]] + } } finally { loading.synchronized { loading.remove(key) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 4de81617b1dd8..fccc12bda9428 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -24,7 +24,8 @@ import org.apache.spark.util.ByteBufferInputStream private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { val objOut = new ObjectOutputStream(out) - def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this } + //Calling reset to avoid memory leak: http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api + def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); objOut.reset(); this } def flush() { objOut.flush() } def close() { objOut.close() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a34c95b6f07b6..26b51d6079363 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -356,7 +356,7 @@ private[spark] class BlockManager( // TODO: Consider creating a putValues that also takes in a iterator? val valuesBuffer = new ArrayBuffer[Any] valuesBuffer ++= values - memoryStore.putValues(blockId, valuesBuffer, level, true).data match { + memoryStore.putValues(blockId, valuesBuffer.toIterator, level, true).data match { case Left(values2) => return Some(values2) case _ => @@ -451,9 +451,7 @@ private[spark] class BlockManager( def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) : Long = { - val elements = new ArrayBuffer[Any] - elements ++= values - put(blockId, elements, level, tellMaster) + doPut(blockId, Left(values), level, tellMaster) } /** @@ -474,7 +472,7 @@ private[spark] class BlockManager( def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean = true) : Long = { require(values != null, "Values is null") - doPut(blockId, Left(values), level, tellMaster) + doPut(blockId, Left(values.toIterator), level, tellMaster) } /** @@ -486,7 +484,7 @@ private[spark] class BlockManager( doPut(blockId, Right(bytes), level, tellMaster) } - private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer], + private def doPut(blockId: BlockId, data: Either[Iterator[Any], ByteBuffer], level: StorageLevel, tellMaster: Boolean = true): Long = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") @@ -691,7 +689,7 @@ private[spark] class BlockManager( logInfo("Writing block " + blockId + " to disk") data match { case Left(elements) => - diskStore.putValues(blockId, elements, level, false) + diskStore.putValues(blockId, elements.toIterator, level, false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index ea426562402ae..f12438fd0de41 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -36,7 +36,7 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { * @return a PutResult that contains the size of the data, as well as the values put if * returnValues is true (if not, the result's data field can be null) */ - def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, + def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean) : PutResult /** diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 5a1e7b44440fd..86523695aab7e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -57,7 +57,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage override def putValues( blockId: BlockId, - values: ArrayBuffer[Any], + values: Iterator[Any], level: StorageLevel, returnValues: Boolean) : PutResult = { @@ -66,7 +66,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) - blockManager.dataSerializeStream(blockId, outputStream, values.iterator) + blockManager.dataSerializeStream(blockId, outputStream, values) val length = file.length val timeTaken = System.currentTimeMillis - startTime diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 05f676c6e2249..7dffb052bc18f 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -65,7 +65,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def putValues( blockId: BlockId, - values: ArrayBuffer[Any], + values: Iterator[Any], level: StorageLevel, returnValues: Boolean) : PutResult = { @@ -73,9 +73,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) tryToPut(blockId, values, sizeEstimate, true) - PutResult(sizeEstimate, Left(values.iterator)) + PutResult(sizeEstimate, Left(values)) } else { - val bytes = blockManager.dataSerialize(blockId, values.iterator) + val bytes = blockManager.dataSerialize(blockId, values) tryToPut(blockId, bytes, bytes.limit, false) PutResult(bytes.limit(), Right(bytes.duplicate())) } From cac1fadeec964cfc254ee1f02b82665aac9a5690 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Wed, 13 Nov 2013 13:49:50 -0800 Subject: [PATCH 02/23] Fixing MemoryStore, so that it converts incoming iterators to ArrayBuffer objects. This was previously done higher up the stack. --- .../main/scala/org/apache/spark/storage/MemoryStore.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7dffb052bc18f..9191852488aa5 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -71,9 +71,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) : PutResult = { if (level.deserialized) { - val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - tryToPut(blockId, values, sizeEstimate, true) - PutResult(sizeEstimate, Left(values)) + val valueEntries = new ArrayBuffer[Any](); + valueEntries ++= values; + val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef]) + tryToPut(blockId, valueEntries, sizeEstimate, true) + PutResult(sizeEstimate, Left(valueEntries.toIterator)) } else { val bytes = blockManager.dataSerialize(blockId, values) tryToPut(blockId, bytes, bytes.limit, false) From 81d670cb9ad9d2e2635a0eb6ecc74f117554a708 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Wed, 13 Nov 2013 17:20:17 -0800 Subject: [PATCH 03/23] Adding unit test for straight to disk iterator methods. --- .../spark/storage/LargeIteratorSuite.scala | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala diff --git a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala new file mode 100644 index 0000000000000..0fd46aaef5ee1 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage + +import org.scalatest.FunSuite +import org.apache.spark.{LocalSparkContext, SparkContext} + +class Expander(base:String, count:Int) extends Iterator[String] { + var i = 0; + def next() : String = { + i += 1; + return base + i.toString; + } + def hasNext() : Boolean = i < count; +} + +object Expander { + def expand(s:String, i:Int) : Iterator[String] = { + return new Expander(s,i) + } +} + +class LargeIteratorSuite extends FunSuite with LocalSparkContext { + + val clusterUrl = "local-cluster[1,1,512]" + test("Flatmap iterator") { + sc = new SparkContext(clusterUrl, "mem_test"); + val seeds = sc.parallelize( Array( + "This is the first sentence that we will test:", + "This is the second sentence that we will test:", + "This is the third sentence that we will test:" + ) ); + val out = seeds.flatMap(Expander.expand(_,10000000)); + out.map(_ + "...").persist(StorageLevel.DISK_ONLY).saveAsTextFile("./test.out") + } +} From 5eb2b7e53d5290fdf71a7addd672c7f4ffbf6ec7 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Sat, 16 Nov 2013 22:19:19 -0800 Subject: [PATCH 04/23] Changing the JavaSerializer reset to occur every 1000 objects. --- .../org/apache/spark/serializer/JavaSerializer.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index fccc12bda9428..f78ac4983212f 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -24,8 +24,18 @@ import org.apache.spark.util.ByteBufferInputStream private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { val objOut = new ObjectOutputStream(out) + var counter = 0; //Calling reset to avoid memory leak: http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api - def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); objOut.reset(); this } + def writeObject[T](t: T): SerializationStream = { + objOut.writeObject(t); + if (counter >= 1000) { + objOut.reset(); + counter = 0; + } else { + counter+=1; + } + this + } def flush() { objOut.flush() } def close() { objOut.close() } } From 44ec35a3733a25df6038827f480e8cf6991f9344 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Sat, 16 Nov 2013 22:35:51 -0800 Subject: [PATCH 05/23] Adding some comments. --- .../scala/org/apache/spark/serializer/JavaSerializer.scala | 5 ++++- .../org/apache/spark/storage/LargeIteratorSuite.scala | 7 ++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index f78ac4983212f..0d9a251a121df 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -25,7 +25,10 @@ import org.apache.spark.util.ByteBufferInputStream private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { val objOut = new ObjectOutputStream(out) var counter = 0; - //Calling reset to avoid memory leak: http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api + /* Calling reset to avoid memory leak: http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api + * But only call it every 1000th time to avoid bloated serialization streams (when + * the stream 'resets' object class descriptions have to be re-written) + */ def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); if (counter >= 1000) { diff --git a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala index 0fd46aaef5ee1..cb973268be737 100644 --- a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala @@ -35,7 +35,12 @@ object Expander { } class LargeIteratorSuite extends FunSuite with LocalSparkContext { - + /* Tests the ability of Spark to deal with user provided iterators that + * generate more data then available memory. In any memory based persistance + * Spark will unroll the iterator into an ArrayBuffer for caching, however in + * the case that the use defines DISK_ONLY persistance, the iterator will be + * fed directly to the serializer and written to disk. + */ val clusterUrl = "local-cluster[1,1,512]" test("Flatmap iterator") { sc = new SparkContext(clusterUrl, "mem_test"); From 95c7f67b131496de51587afa373eee9da1a5d46b Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 24 Feb 2014 13:35:05 -0800 Subject: [PATCH 06/23] Simplifying StorageLevel checks --- core/src/main/scala/org/apache/spark/CacheManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index a7d10e08568f7..566674dd68414 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -71,7 +71,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } - if (storageLevel == StorageLevel.DISK_ONLY || storageLevel == StorageLevel.DISK_ONLY_2) { + if (storageLevel.useDisk || !storageLevel.useMemory) { blockManager.put(key, computedValues, storageLevel, tellMaster = true) return blockManager.get(key) match { case Some(values) => From 0e6f8084fe2e7cfb5129a016fcd65d62e4005031 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 24 Feb 2014 15:56:09 -0800 Subject: [PATCH 07/23] Deleting temp output directory when done --- .../scala/org/apache/spark/storage/LargeIteratorSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala index cb973268be737..a3214a903b4ad 100644 --- a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.storage import org.scalatest.FunSuite import org.apache.spark.{LocalSparkContext, SparkContext} +import org.apache.commons.io.FileUtils +import java.io.File class Expander(base:String, count:Int) extends Iterator[String] { var i = 0; @@ -51,5 +53,6 @@ class LargeIteratorSuite extends FunSuite with LocalSparkContext { ) ); val out = seeds.flatMap(Expander.expand(_,10000000)); out.map(_ + "...").persist(StorageLevel.DISK_ONLY).saveAsTextFile("./test.out") + FileUtils.deleteDirectory(new File("./test.out")) } } From 2eeda75621eb1d60f10d1f4ab805acae75edd7c5 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 24 Feb 2014 16:20:20 -0800 Subject: [PATCH 08/23] Fixing dumb mistake ("||" instead of "&&") --- core/src/main/scala/org/apache/spark/CacheManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 566674dd68414..9167b59243ecc 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -71,7 +71,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } - if (storageLevel.useDisk || !storageLevel.useMemory) { + if (storageLevel.useDisk && !storageLevel.useMemory) { blockManager.put(key, computedValues, storageLevel, tellMaster = true) return blockManager.get(key) match { case Some(values) => From a6424ba6b2551d4366a57cd1d5d32ffe5a4a3fd0 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 24 Feb 2014 16:24:03 -0800 Subject: [PATCH 09/23] Wrapping long line --- .../scala/org/apache/spark/serializer/JavaSerializer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 93b3959715bee..ec7437ad36635 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -26,7 +26,8 @@ import org.apache.spark.SparkConf private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { val objOut = new ObjectOutputStream(out) var counter = 0; - /* Calling reset to avoid memory leak: http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api + /* Calling reset to avoid memory leak: + * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api * But only call it every 1000th time to avoid bloated serialization streams (when * the stream 'resets' object class descriptions have to be re-written) */ From 9df02765528d57935a9aed8daf754a065f5d0ef5 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 24 Feb 2014 17:21:48 -0800 Subject: [PATCH 10/23] Added check to make sure that streamed-to-dist RDD actually returns good data in the LargeIteratorSuite --- .../org/apache/spark/storage/LargeIteratorSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala index a3214a903b4ad..a22f81e60b5b0 100644 --- a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala @@ -51,8 +51,11 @@ class LargeIteratorSuite extends FunSuite with LocalSparkContext { "This is the second sentence that we will test:", "This is the third sentence that we will test:" ) ); - val out = seeds.flatMap(Expander.expand(_,10000000)); - out.map(_ + "...").persist(StorageLevel.DISK_ONLY).saveAsTextFile("./test.out") + val expand_size = 10000000; + val out = seeds.flatMap(Expander.expand(_,expand_size)); + val expanded = out.map(_ + "...").persist(StorageLevel.DISK_ONLY) + assert( expanded.filter( _.startsWith("This is the first sentence that we will test")).count() == expand_size ) + expanded.saveAsTextFile("./test.out") FileUtils.deleteDirectory(new File("./test.out")) } } From 31fe08ed356c5fb37a985ea72a10d6e3e165c80b Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 24 Feb 2014 17:34:27 -0800 Subject: [PATCH 11/23] Removing un-needed semi-colons --- .../src/main/scala/org/apache/spark/CacheManager.scala | 4 ++-- .../org/apache/spark/serializer/JavaSerializer.scala | 10 +++++----- .../scala/org/apache/spark/storage/MemoryStore.scala | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 9167b59243ecc..ce3cbf3258c92 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -77,8 +77,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => - logInfo("Failure to store %s".format(key)); - return null; + logInfo("Failure to store %s".format(key)) + return null } } else { val elements = new ArrayBuffer[Any] diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index ec7437ad36635..3ac38c4139225 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -25,19 +25,19 @@ import org.apache.spark.SparkConf private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { val objOut = new ObjectOutputStream(out) - var counter = 0; + var counter = 0 /* Calling reset to avoid memory leak: * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api * But only call it every 1000th time to avoid bloated serialization streams (when * the stream 'resets' object class descriptions have to be re-written) */ def writeObject[T](t: T): SerializationStream = { - objOut.writeObject(t); + objOut.writeObject(t) if (counter >= 1000) { - objOut.reset(); - counter = 0; + objOut.reset() + counter = 0 } else { - counter+=1; + counter += 1 } this } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 3b842eb1370f8..6d0ca12aae01e 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -71,8 +71,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) : PutResult = { if (level.deserialized) { - val valueEntries = new ArrayBuffer[Any](); - valueEntries ++= values; + val valueEntries = new ArrayBuffer[Any]() + valueEntries ++= values val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef]) tryToPut(blockId, valueEntries, sizeEstimate, true) PutResult(sizeEstimate, Left(valueEntries.toIterator)) From 40fe1d7cf83ce2fe29a061ec2d6e6e54bd18a6ff Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 24 Feb 2014 19:34:43 -0800 Subject: [PATCH 12/23] Removing rouge space --- core/src/main/scala/org/apache/spark/CacheManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index ce3cbf3258c92..5dc8791dc5f0d 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -73,7 +73,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { if (context.runningLocally) { return computedValues } if (storageLevel.useDisk && !storageLevel.useMemory) { blockManager.put(key, computedValues, storageLevel, tellMaster = true) - return blockManager.get(key) match { + return blockManager.get(key) match { case Some(values) => return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => From 00c98e07334dac20085f51977d015cab6e2242bb Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 24 Feb 2014 22:22:12 -0800 Subject: [PATCH 13/23] Making the Java ObjectStreamSerializer reset rate configurable by the system variable 'spark.serializer.objectStreamReset', default is not 10000. --- .../scala/org/apache/spark/serializer/JavaSerializer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 3ac38c4139225..4a23db06eb486 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -26,6 +26,8 @@ import org.apache.spark.SparkConf private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { val objOut = new ObjectOutputStream(out) var counter = 0 + val counterReset = System.getProperty("spark.serializer.objectStreamReset", "10000").toLong + /* Calling reset to avoid memory leak: * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api * But only call it every 1000th time to avoid bloated serialization streams (when @@ -33,7 +35,7 @@ private[spark] class JavaSerializationStream(out: OutputStream) extends Serializ */ def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t) - if (counter >= 1000) { + if (counterReset > 0 && counter >= counterReset) { objOut.reset() counter = 0 } else { From 656c33e800a0f3c7926dd0857105e12e0cf5fb25 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 25 Feb 2014 10:58:20 -0800 Subject: [PATCH 14/23] Fixing the JavaSerializer to read from the SparkConf rather then the System property. --- .../org/apache/spark/serializer/JavaSerializer.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 696066bfb0617..2e3b527e3de24 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -23,10 +23,10 @@ import java.nio.ByteBuffer import org.apache.spark.SparkConf import org.apache.spark.util.ByteBufferInputStream -private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { +private[spark] class JavaSerializationStream(out: OutputStream, conf: SparkConf) extends SerializationStream { val objOut = new ObjectOutputStream(out) var counter = 0 - val counterReset = System.getProperty("spark.serializer.objectStreamReset", "10000").toLong + val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000) /* Calling reset to avoid memory leak: * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api @@ -58,7 +58,7 @@ extends DeserializationStream { def close() { objIn.close() } } -private[spark] class JavaSerializerInstance extends SerializerInstance { +private[spark] class JavaSerializerInstance(conf: SparkConf) extends SerializerInstance { def serialize[T](t: T): ByteBuffer = { val bos = new ByteArrayOutputStream() val out = serializeStream(bos) @@ -80,7 +80,7 @@ private[spark] class JavaSerializerInstance extends SerializerInstance { } def serializeStream(s: OutputStream): SerializationStream = { - new JavaSerializationStream(s) + new JavaSerializationStream(s, conf) } def deserializeStream(s: InputStream): DeserializationStream = { @@ -96,5 +96,5 @@ private[spark] class JavaSerializerInstance extends SerializerInstance { * A Spark serializer that uses Java's built-in serialization. */ class JavaSerializer(conf: SparkConf) extends Serializer { - def newInstance(): SerializerInstance = new JavaSerializerInstance + def newInstance(): SerializerInstance = new JavaSerializerInstance(conf) } From 0f28ec70853a6a5ab198bc113f6af77b78d34d51 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 25 Feb 2014 11:03:23 -0800 Subject: [PATCH 15/23] Adding second putValues to BlockStore interface that accepts an ArrayBuffer (rather then an Iterator). This will allow BlockStores to have slightly different behaviors dependent on whether they get an Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache an Iterator into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the duplication. --- .../apache/spark/storage/BlockManager.scala | 18 +++++++++++++----- .../org/apache/spark/storage/BlockStore.scala | 3 +++ .../org/apache/spark/storage/DiskStore.scala | 9 +++++++++ .../org/apache/spark/storage/MemoryStore.scala | 17 +++++++++++++++++ 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a44aa47fde57c..c5c3ace5de95c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -455,7 +455,7 @@ private[spark] class BlockManager( def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) : Long = { - doPut(blockId, Left(values), level, tellMaster) + doPut(blockId, Left(Left(values)), level, tellMaster) } /** @@ -477,7 +477,7 @@ private[spark] class BlockManager( def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean = true) : Long = { require(values != null, "Values is null") - doPut(blockId, Left(values.toIterator), level, tellMaster) + doPut(blockId, Left(Right(values)), level, tellMaster) } /** @@ -489,7 +489,7 @@ private[spark] class BlockManager( doPut(blockId, Right(bytes), level, tellMaster) } - private def doPut(blockId: BlockId, data: Either[Iterator[Any], ByteBuffer], + private def doPut(blockId: BlockId, data: Either[Either[Iterator[Any],ArrayBuffer[Any]], ByteBuffer], level: StorageLevel, tellMaster: Boolean = true): Long = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") @@ -552,7 +552,10 @@ private[spark] class BlockManager( if (level.useMemory) { // Save it just to memory first, even if it also has useDisk set to true; we will // drop it to disk later if the memory store can't hold it. - val res = memoryStore.putValues(blockId, values, level, true) + val res = values match { + case Left(values_i) => memoryStore.putValues(blockId, values_i, level, true) + case Right(values_a) => memoryStore.putValues(blockId, values_a, level, true) + } size = res.size res.data match { case Right(newBytes) => bytesAfterPut = newBytes @@ -562,7 +565,12 @@ private[spark] class BlockManager( // Save directly to disk. // Don't get back the bytes unless we replicate them. val askForBytes = level.replication > 1 - val res = diskStore.putValues(blockId, values, level, askForBytes) + + val res = values match { + case Left(values_i) => diskStore.putValues(blockId, values_i, level, askForBytes) + case Right(values_a) => diskStore.putValues(blockId, values_a, level, askForBytes) + } + size = res.size res.data match { case Right(newBytes) => bytesAfterPut = newBytes diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index 3b37288361a1a..4986aaec59f01 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -40,6 +40,9 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean) : PutResult + def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, + returnValues: Boolean) : PutResult + /** * Return the size of a block in bytes. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 44541f83d67b0..adf42ed697ad6 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -54,6 +54,15 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime))) } + override def putValues( + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + returnValues: Boolean) + : PutResult = { + return putValues(blockId, values.toIterator, level, returnValues) + } + override def putValues( blockId: BlockId, values: Iterator[Any], diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 18a851016ae9d..91904538558bf 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -64,6 +64,23 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } + override def putValues( + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + returnValues: Boolean) + : PutResult = { + if (level.deserialized) { + val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) + tryToPut(blockId, values, sizeEstimate, true) + PutResult(sizeEstimate, Left(values.toIterator)) + } else { + val bytes = blockManager.dataSerialize(blockId, values.toIterator) + tryToPut(blockId, bytes, bytes.limit, false) + PutResult(bytes.limit(), Right(bytes.duplicate())) + } + } + override def putValues( blockId: BlockId, values: Iterator[Any], From 627a8b79d760103674f3c5b108900e911a6a7eeb Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 25 Feb 2014 13:20:30 -0800 Subject: [PATCH 16/23] Wrapping a few long lines --- .../scala/org/apache/spark/serializer/JavaSerializer.scala | 3 ++- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 2e3b527e3de24..d11d4df035f55 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -23,7 +23,8 @@ import java.nio.ByteBuffer import org.apache.spark.SparkConf import org.apache.spark.util.ByteBufferInputStream -private[spark] class JavaSerializationStream(out: OutputStream, conf: SparkConf) extends SerializationStream { +private[spark] class JavaSerializationStream(out: OutputStream, + conf: SparkConf) extends SerializationStream { val objOut = new ObjectOutputStream(out) var counter = 0 val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c5c3ace5de95c..ea7101c168f31 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -489,7 +489,8 @@ private[spark] class BlockManager( doPut(blockId, Right(bytes), level, tellMaster) } - private def doPut(blockId: BlockId, data: Either[Either[Iterator[Any],ArrayBuffer[Any]], ByteBuffer], + private def doPut(blockId: BlockId, + data: Either[Either[Iterator[Any],ArrayBuffer[Any]], ByteBuffer], level: StorageLevel, tellMaster: Boolean = true): Long = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") From c2fb43056c836ebb520bd076da2b576c32e794cf Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 25 Feb 2014 17:09:29 -0800 Subject: [PATCH 17/23] Removing more un-needed array-buffer to iterator conversions --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ea7101c168f31..988d5fdbf6576 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -360,7 +360,7 @@ private[spark] class BlockManager( // TODO: Consider creating a putValues that also takes in a iterator? val valuesBuffer = new ArrayBuffer[Any] valuesBuffer ++= values - memoryStore.putValues(blockId, valuesBuffer.toIterator, level, true).data match { + memoryStore.putValues(blockId, valuesBuffer, level, true).data match { case Left(values2) => return Some(values2) case _ => @@ -703,7 +703,7 @@ private[spark] class BlockManager( logInfo("Writing block " + blockId + " to disk") data match { case Left(elements) => - diskStore.putValues(blockId, elements.toIterator, level, false) + diskStore.putValues(blockId, elements, level, false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } From 16a4ceae706c3458e5a2721f8c27eebbf2cf4c89 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Wed, 26 Feb 2014 11:05:27 -0800 Subject: [PATCH 18/23] Streamlined the LargeIteratorSuite unit test. It should now run in ~25 seconds. Confirmed that it still crashes an unpatched copy of Spark. --- .../spark/storage/LargeIteratorSuite.scala | 62 +++++++------------ 1 file changed, 24 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala index a22f81e60b5b0..52c2d01f02cf4 100644 --- a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala @@ -17,45 +17,31 @@ package org.apache.spark.storage import org.scalatest.FunSuite -import org.apache.spark.{LocalSparkContext, SparkContext} -import org.apache.commons.io.FileUtils -import java.io.File - -class Expander(base:String, count:Int) extends Iterator[String] { - var i = 0; - def next() : String = { - i += 1; - return base + i.toString; - } - def hasNext() : Boolean = i < count; -} - -object Expander { - def expand(s:String, i:Int) : Iterator[String] = { - return new Expander(s,i) - } -} +import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext} class LargeIteratorSuite extends FunSuite with LocalSparkContext { - /* Tests the ability of Spark to deal with user provided iterators that - * generate more data then available memory. In any memory based persistance - * Spark will unroll the iterator into an ArrayBuffer for caching, however in - * the case that the use defines DISK_ONLY persistance, the iterator will be - * fed directly to the serializer and written to disk. - */ - val clusterUrl = "local-cluster[1,1,512]" - test("Flatmap iterator") { - sc = new SparkContext(clusterUrl, "mem_test"); - val seeds = sc.parallelize( Array( - "This is the first sentence that we will test:", - "This is the second sentence that we will test:", - "This is the third sentence that we will test:" - ) ); - val expand_size = 10000000; - val out = seeds.flatMap(Expander.expand(_,expand_size)); - val expanded = out.map(_ + "...").persist(StorageLevel.DISK_ONLY) - assert( expanded.filter( _.startsWith("This is the first sentence that we will test")).count() == expand_size ) - expanded.saveAsTextFile("./test.out") - FileUtils.deleteDirectory(new File("./test.out")) +/* Tests the ability of Spark to deal with user provided iterators that + * generate more data then available memory. In any memory based persistance + * Spark will unroll the iterator into an ArrayBuffer for caching, however in + * the case that the use defines DISK_ONLY persistance, the iterator will be + * fed directly to the serializer and written to disk. + */ +test("Flatmap iterator") { + //create a local spark cluster, but limit it to 128mb of RAM + val sconf = new SparkConf().setMaster("local-cluster[1,1,128]") + .setAppName("mem_test") + .set("spark.executor.memory", "128m") + sc = new SparkContext(sconf) + try { + val expand_size = 1000000 + val data = sc.parallelize( (1 to 4).toSeq ). + flatMap( x => Stream.range(1, expand_size). + map( y => "%d: string test %d".format(y,x) ) ) + // If we did persist(StorageLevel.MEMORY_ONLY), it would cause OutOfMemoryError + var persisted = data.persist(StorageLevel.DISK_ONLY) + assert( persisted.filter( _.startsWith("1:") ).count() == 4 ) + } catch { + case _ : OutOfMemoryError => assert(false) } } +} From 7ccc74b7f7a2c58739cde2e4e83950e07e7fd3eb Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Thu, 27 Feb 2014 13:34:56 -0800 Subject: [PATCH 19/23] Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It doesn't try to invoke a OOM error any more --- .../spark/storage/FlatmapIteratorSuite.scala | 83 +++++++++++++++++++ .../spark/storage/LargeIteratorSuite.scala | 61 -------------- 2 files changed, 83 insertions(+), 61 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala delete mode 100644 core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala new file mode 100644 index 0000000000000..3887cb3e9321a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage + +import org.scalatest.FunSuite +import org.apache.spark.{SharedSparkContext, SparkConf, LocalSparkContext, SparkContext} + + +class FlatmapIteratorSuite extends FunSuite with LocalSparkContext { + /* Tests the ability of Spark to deal with user provided iterators from flatMap + * calls, that may generate more data then available memory. In any + * memory based persistance Spark will unroll the iterator into an ArrayBuffer + * for caching, however in the case that the use defines DISK_ONLY persistance, + * the iterator will be fed directly to the serializer and written to disk. + * + * This also tests the ObjectOutputStream reset rate. When serializing using the + * Java serialization system, the serializer caches objects to prevent writing redundant + * data, however that stops GC of those objects. By calling 'reset' you flush that + * info from the serializer, and allow old objects to be GC'd + */ + test("Flatmap Iterator to Disk") { + val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") + .setAppName("iterator_to_disk_test") + sc = new SparkContext(sconf) + try { + val expand_size = 100 + val data = sc.parallelize( (1 to 5).toSeq ). + flatMap( x => Stream.range(0, expand_size) ) + var persisted = data.persist(StorageLevel.DISK_ONLY) + println(persisted.count()) + assert( persisted.count() == 500) + assert( persisted.filter( _==1 ).count() == 5 ) + } catch { + case _ : OutOfMemoryError => assert(false) + } + } + + test("Flatmap Iterator to Memory") { + val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") + .setAppName("iterator_to_disk_test") + sc = new SparkContext(sconf) + try { + val expand_size = 100 + val data = sc.parallelize( (1 to 5).toSeq ). + flatMap( x => Stream.range(0, expand_size) ) + var persisted = data.persist(StorageLevel.MEMORY_ONLY) + println(persisted.count()) + assert( persisted.count() == 500) + assert( persisted.filter( _==1 ).count() == 5 ) + } catch { + case _ : OutOfMemoryError => assert(false) + } + } + + test("Serializer Reset") { + val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") + .setAppName("serializer_reset_test") + .set("spark.serializer.objectStreamReset", "10") + + sc = new SparkContext(sconf) + val expand_size = 500 + val data = sc.parallelize( Seq(1,2) ). + flatMap( x => Stream.range(1, expand_size). + map( y => "%d: string test %d".format(y,x) ) ) + var persisted = data.persist(StorageLevel.MEMORY_ONLY_SER) + assert( persisted.filter( _.startsWith("1:") ).count() == 2 ) + } + +} diff --git a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala deleted file mode 100644 index a22f81e60b5b0..0000000000000 --- a/core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.storage - -import org.scalatest.FunSuite -import org.apache.spark.{LocalSparkContext, SparkContext} -import org.apache.commons.io.FileUtils -import java.io.File - -class Expander(base:String, count:Int) extends Iterator[String] { - var i = 0; - def next() : String = { - i += 1; - return base + i.toString; - } - def hasNext() : Boolean = i < count; -} - -object Expander { - def expand(s:String, i:Int) : Iterator[String] = { - return new Expander(s,i) - } -} - -class LargeIteratorSuite extends FunSuite with LocalSparkContext { - /* Tests the ability of Spark to deal with user provided iterators that - * generate more data then available memory. In any memory based persistance - * Spark will unroll the iterator into an ArrayBuffer for caching, however in - * the case that the use defines DISK_ONLY persistance, the iterator will be - * fed directly to the serializer and written to disk. - */ - val clusterUrl = "local-cluster[1,1,512]" - test("Flatmap iterator") { - sc = new SparkContext(clusterUrl, "mem_test"); - val seeds = sc.parallelize( Array( - "This is the first sentence that we will test:", - "This is the second sentence that we will test:", - "This is the third sentence that we will test:" - ) ); - val expand_size = 10000000; - val out = seeds.flatMap(Expander.expand(_,expand_size)); - val expanded = out.map(_ + "...").persist(StorageLevel.DISK_ONLY) - assert( expanded.filter( _.startsWith("This is the first sentence that we will test")).count() == expand_size ) - expanded.saveAsTextFile("./test.out") - FileUtils.deleteDirectory(new File("./test.out")) - } -} From f70d06939bb9c164a0a6c9af42f663bc882c3211 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Thu, 27 Feb 2014 13:36:06 -0800 Subject: [PATCH 20/23] Adding docs for spark.serializer.objectStreamReset configuration --- docs/configuration.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 8e4c48c81f8be..260d38dd14da6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -237,6 +237,17 @@ Apart from these, the following properties are also available, and may be useful exceeded" exception inside Kryo. Note that there will be one buffer per core on each worker. + + spark.serializer.objectStreamReset + 10000 + + When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches + objects to prevent writing redundant data, however that stops garbage collection of those + objects. By calling 'reset' you flush that info from the serializer, and allow old + objects to be collected. To turn off this periodic reset set it to a value of <= 0. + By default it will reset the serializer every 10,000 objects. + + spark.broadcast.factory org.apache.spark.broadcast.
HttpBroadcastFactory From 2f684ea15053d1ad934d60c58e082c1edf57b3a0 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Thu, 27 Feb 2014 13:36:42 -0800 Subject: [PATCH 21/23] Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now using trait 'Values'. Also modified BlockStore.putBytes call to return PutResult, so that it behaves like putValues. --- .../apache/spark/storage/BlockManager.scala | 92 ++++++++++--------- .../org/apache/spark/storage/BlockStore.scala | 2 +- .../org/apache/spark/storage/DiskStore.scala | 3 +- .../apache/spark/storage/MemoryStore.scala | 4 +- 4 files changed, 56 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 988d5fdbf6576..cc12bd2f9e026 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -35,6 +35,12 @@ import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.util._ +sealed trait Values + +case class ByteBufferValues(buffer: ByteBuffer) extends Values +case class IteratorValues(iterator: Iterator[Any]) extends Values +case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values + private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, @@ -455,7 +461,7 @@ private[spark] class BlockManager( def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) : Long = { - doPut(blockId, Left(Left(values)), level, tellMaster) + doPut(blockId, IteratorValues(values), level, tellMaster) } /** @@ -477,7 +483,7 @@ private[spark] class BlockManager( def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean = true) : Long = { require(values != null, "Values is null") - doPut(blockId, Left(Right(values)), level, tellMaster) + doPut(blockId, ArrayBufferValues(values), level, tellMaster) } /** @@ -486,11 +492,11 @@ private[spark] class BlockManager( def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) { require(bytes != null, "Bytes is null") - doPut(blockId, Right(bytes), level, tellMaster) + doPut(blockId, ByteBufferValues(bytes), level, tellMaster) } private def doPut(blockId: BlockId, - data: Either[Either[Iterator[Any],ArrayBuffer[Any]], ByteBuffer], + data: Values, level: StorageLevel, tellMaster: Boolean = true): Long = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") @@ -533,8 +539,9 @@ private[spark] class BlockManager( // If we're storing bytes, then initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. - val replicationFuture = if (data.isRight && level.replication > 1) { - val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper + val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) { + //Duplicate doesn't copy the bytes, just creates a wrapper + val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate() Future { replicate(blockId, bufferView, level) } @@ -548,42 +555,43 @@ private[spark] class BlockManager( var marked = false try { - data match { - case Left(values) => { - if (level.useMemory) { - // Save it just to memory first, even if it also has useDisk set to true; we will - // drop it to disk later if the memory store can't hold it. - val res = values match { - case Left(values_i) => memoryStore.putValues(blockId, values_i, level, true) - case Right(values_a) => memoryStore.putValues(blockId, values_a, level, true) - } - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case Left(newIterator) => valuesAfterPut = newIterator - } - } else { - // Save directly to disk. - // Don't get back the bytes unless we replicate them. - val askForBytes = level.replication > 1 - - val res = values match { - case Left(values_i) => diskStore.putValues(blockId, values_i, level, askForBytes) - case Right(values_a) => diskStore.putValues(blockId, values_a, level, askForBytes) - } - - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case _ => - } + if (level.useMemory) { + // Save it just to memory first, even if it also has useDisk set to true; we will + // drop it to disk later if the memory store can't hold it. + val res = data match { + case IteratorValues(values_i) => + memoryStore.putValues(blockId, values_i, level, true) + case ArrayBufferValues(values_a) => + memoryStore.putValues(blockId, values_a, level, true) + case ByteBufferValues(value_bytes) => { + value_bytes.rewind(); + memoryStore.putBytes(blockId, value_bytes, level) + } + } + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case Left(newIterator) => valuesAfterPut = newIterator + } + } else { + // Save directly to disk. + // Don't get back the bytes unless we replicate them. + val askForBytes = level.replication > 1 + + val res = data match { + case IteratorValues(values_i) => + diskStore.putValues(blockId, values_i, level, askForBytes) + case ArrayBufferValues(values_a) => + diskStore.putValues(blockId, values_a, level, askForBytes) + case ByteBufferValues(value_bytes) => { + value_bytes.rewind(); + diskStore.putBytes(blockId, value_bytes, level) } } - case Right(bytes) => { - bytes.rewind() - // Store it only in memory at first, even if useDisk is also set to true - (if (level.useMemory) memoryStore else diskStore).putBytes(blockId, bytes, level) - size = bytes.limit + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case _ => } } @@ -612,8 +620,8 @@ private[spark] class BlockManager( // values and need to serialize and replicate them now: if (level.replication > 1) { data match { - case Right(bytes) => Await.ready(replicationFuture, Duration.Inf) - case Left(values) => { + case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf) + case _ => { val remoteStartTime = System.currentTimeMillis // Serialize the block if not already done if (bytesAfterPut == null) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index 4986aaec59f01..a27eae129e14d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -28,7 +28,7 @@ import org.apache.spark.Logging */ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging { - def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) + def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) : PutResult /** * Put in a block and, possibly, also return its content as either bytes or another Iterator. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index adf42ed697ad6..b5e8504d892b1 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -37,7 +37,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage diskManager.getBlockLocation(blockId).length } - override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) { + override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = { // So that we do not modify the input offsets ! // duplicate does not copy buffer, so inexpensive val bytes = _bytes.duplicate() @@ -52,6 +52,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime))) + return PutResult(bytes.limit(), Right(bytes.duplicate())) } override def putValues( diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 91904538558bf..cfec636791c8f 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } - override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) { + override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = { // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes.duplicate() bytes.rewind() @@ -59,8 +59,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) tryToPut(blockId, elements, sizeEstimate, true) + PutResult(sizeEstimate, Left(values.toIterator)) } else { tryToPut(blockId, bytes, bytes.limit, false) + PutResult(bytes.limit(), Right(bytes.duplicate())) } } From 60e0c578181d4cf011283aff689d0000b649dbf6 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 3 Mar 2014 17:14:51 -0800 Subject: [PATCH 22/23] Fixing issues (formatting, variable names, etc.) from review comments --- .../scala/org/apache/spark/CacheManager.scala | 11 ++++- .../apache/spark/storage/BlockManager.scala | 30 ++++++------- .../org/apache/spark/storage/BlockStore.scala | 2 +- .../org/apache/spark/storage/DiskStore.scala | 8 ++-- .../apache/spark/storage/MemoryStore.scala | 8 ++-- .../spark/storage/FlatmapIteratorSuite.scala | 45 ++++++++----------- 6 files changed, 52 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 6e2348af1fce9..872e892c04fe6 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -72,15 +72,24 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } if (storageLevel.useDisk && !storageLevel.useMemory) { + // In the case that this RDD is to be persisted using DISK_ONLY + // the iterator will be passed directly to the blockManager (rather then + // caching it to an ArrayBuffer first), then the resulting block data iterator + // will be passed back to the user. If the iterator generates a lot of data, + // this means that it doesn't all have to be held in memory at one time. + // This could also apply to MEMORY_ONLY_SER storage, but we need to make sure + // blocks aren't dropped by the block store before enabling that. blockManager.put(key, computedValues, storageLevel, tellMaster = true) return blockManager.get(key) match { case Some(values) => return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => logInfo("Failure to store %s".format(key)) - return null + throw new Exception("Block manager failed to return persisted valued") } } else { + // In this case the RDD is cached to an array buffer. This will save the results + // if we're dealing with a 'one-time' iterator val elements = new ArrayBuffer[Any] elements ++= computedValues blockManager.put(key, elements, storageLevel, tellMaster = true) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index cc12bd2f9e026..977c24687cc5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -540,7 +540,7 @@ private[spark] class BlockManager( // If we're storing bytes, then initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) { - //Duplicate doesn't copy the bytes, just creates a wrapper + // Duplicate doesn't copy the bytes, just creates a wrapper val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate() Future { replicate(blockId, bufferView, level) @@ -559,13 +559,13 @@ private[spark] class BlockManager( // Save it just to memory first, even if it also has useDisk set to true; we will // drop it to disk later if the memory store can't hold it. val res = data match { - case IteratorValues(values_i) => - memoryStore.putValues(blockId, values_i, level, true) - case ArrayBufferValues(values_a) => - memoryStore.putValues(blockId, values_a, level, true) - case ByteBufferValues(value_bytes) => { - value_bytes.rewind(); - memoryStore.putBytes(blockId, value_bytes, level) + case IteratorValues(iterator) => + memoryStore.putValues(blockId, iterator, level, true) + case ArrayBufferValues(array) => + memoryStore.putValues(blockId, array, level, true) + case ByteBufferValues(bytes) => { + bytes.rewind(); + memoryStore.putBytes(blockId, bytes, level) } } size = res.size @@ -579,13 +579,13 @@ private[spark] class BlockManager( val askForBytes = level.replication > 1 val res = data match { - case IteratorValues(values_i) => - diskStore.putValues(blockId, values_i, level, askForBytes) - case ArrayBufferValues(values_a) => - diskStore.putValues(blockId, values_a, level, askForBytes) - case ByteBufferValues(value_bytes) => { - value_bytes.rewind(); - diskStore.putBytes(blockId, value_bytes, level) + case IteratorValues(iterator) => + diskStore.putValues(blockId, iterator, level, askForBytes) + case ArrayBufferValues(array) => + diskStore.putValues(blockId, array, level, askForBytes) + case ByteBufferValues(bytes) => { + bytes.rewind(); + diskStore.putBytes(blockId, bytes, level) } } size = res.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index a27eae129e14d..9a9be047c7245 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -41,7 +41,7 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { returnValues: Boolean) : PutResult def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, - returnValues: Boolean) : PutResult + returnValues: Boolean) : PutResult /** * Return the size of a block in bytes. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index b5e8504d892b1..36ee4bcc41c66 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -56,10 +56,10 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage } override def putValues( - blockId: BlockId, - values: ArrayBuffer[Any], - level: StorageLevel, - returnValues: Boolean) + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + returnValues: Boolean) : PutResult = { return putValues(blockId, values.toIterator, level, returnValues) } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index cfec636791c8f..b89212eaabf6c 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -67,10 +67,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def putValues( - blockId: BlockId, - values: ArrayBuffer[Any], - level: StorageLevel, - returnValues: Boolean) + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + returnValues: Boolean) : PutResult = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala index 3887cb3e9321a..b843b4c629e84 100644 --- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala @@ -36,48 +36,39 @@ class FlatmapIteratorSuite extends FunSuite with LocalSparkContext { val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") .setAppName("iterator_to_disk_test") sc = new SparkContext(sconf) - try { - val expand_size = 100 - val data = sc.parallelize( (1 to 5).toSeq ). - flatMap( x => Stream.range(0, expand_size) ) - var persisted = data.persist(StorageLevel.DISK_ONLY) - println(persisted.count()) - assert( persisted.count() == 500) - assert( persisted.filter( _==1 ).count() == 5 ) - } catch { - case _ : OutOfMemoryError => assert(false) - } + val expand_size = 100 + val data = sc.parallelize((1 to 5).toSeq). + flatMap( x => Stream.range(0, expand_size)) + var persisted = data.persist(StorageLevel.DISK_ONLY) + println(persisted.count()) + assert(persisted.count()===500) + assert(persisted.filter(_==1).count()===5) } test("Flatmap Iterator to Memory") { val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") .setAppName("iterator_to_disk_test") sc = new SparkContext(sconf) - try { - val expand_size = 100 - val data = sc.parallelize( (1 to 5).toSeq ). - flatMap( x => Stream.range(0, expand_size) ) - var persisted = data.persist(StorageLevel.MEMORY_ONLY) - println(persisted.count()) - assert( persisted.count() == 500) - assert( persisted.filter( _==1 ).count() == 5 ) - } catch { - case _ : OutOfMemoryError => assert(false) - } + val expand_size = 100 + val data = sc.parallelize((1 to 5).toSeq). + flatMap(x => Stream.range(0, expand_size)) + var persisted = data.persist(StorageLevel.MEMORY_ONLY) + println(persisted.count()) + assert(persisted.count()===500) + assert(persisted.filter(_==1).count()===5) } test("Serializer Reset") { val sconf = new SparkConf().setMaster("local-cluster[1,1,512]") .setAppName("serializer_reset_test") .set("spark.serializer.objectStreamReset", "10") - sc = new SparkContext(sconf) val expand_size = 500 - val data = sc.parallelize( Seq(1,2) ). - flatMap( x => Stream.range(1, expand_size). - map( y => "%d: string test %d".format(y,x) ) ) + val data = sc.parallelize(Seq(1,2)). + flatMap(x => Stream.range(1, expand_size). + map(y => "%d: string test %d".format(y,x))) var persisted = data.persist(StorageLevel.MEMORY_ONLY_SER) - assert( persisted.filter( _.startsWith("1:") ).count() == 2 ) + assert(persisted.filter(_.startsWith("1:")).count()===2) } } From 9ef7cb84a060e6618766f47695ae8e41cfb15246 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 3 Mar 2014 22:26:10 -0800 Subject: [PATCH 23/23] Fixing formatting issues. --- .../org/apache/spark/serializer/JavaSerializer.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index d11d4df035f55..bfa647f7f0516 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -23,16 +23,17 @@ import java.nio.ByteBuffer import org.apache.spark.SparkConf import org.apache.spark.util.ByteBufferInputStream -private[spark] class JavaSerializationStream(out: OutputStream, - conf: SparkConf) extends SerializationStream { +private[spark] class JavaSerializationStream(out: OutputStream, conf: SparkConf) + extends SerializationStream { val objOut = new ObjectOutputStream(out) var counter = 0 val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000) - /* Calling reset to avoid memory leak: + /** + * Calling reset to avoid memory leak: * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api - * But only call it every 1000th time to avoid bloated serialization streams (when - * the stream 'resets' object class descriptions have to be re-written) + * But only call it every 10,000th time to avoid bloated serialization streams (when + * the stream 'resets' object class descriptions have to be re-written) */ def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t)