From 86f78237b7623e4efa06c5feb053e0c304979c73 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 24 Apr 2015 03:05:58 -0700 Subject: [PATCH 01/23] Implement transitive cleaning + add missing documentation See in-code comments for more detail on what this means. --- .../apache/spark/util/ClosureCleaner.scala | 249 +++++++++++++++--- 1 file changed, 208 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index e3f52f6ff1e63..18c4e4b87cc83 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -19,17 +19,20 @@ package org.apache.spark.util import java.io.{ByteArrayInputStream, ByteArrayOutputStream} -import scala.collection.mutable.Map -import scala.collection.mutable.Set +import scala.collection.mutable.{Map, Set} import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ import org.apache.spark.{Logging, SparkEnv, SparkException} +/** + * A cleaner that renders closures serializable if they can be done so safely. + */ private[spark] object ClosureCleaner extends Logging { + // Get an ASM class reader for a given class from the JAR that loaded it - private def getClassReader(cls: Class[_]): ClassReader = { + def getClassReader(cls: Class[_]): ClassReader = { // Copy data over, before delegating to ClassReader - else we can run out of open file handles. val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" val resourceStream = cls.getResourceAsStream(className) @@ -77,6 +80,9 @@ private[spark] object ClosureCleaner extends Logging { Nil } + /** + * Return a list of classes that represent closures enclosed in the given closure object. + */ private def getInnerClasses(obj: AnyRef): List[Class[_]] = { val seen = Set[Class[_]](obj.getClass) var stack = List[Class[_]](obj.getClass) @@ -101,21 +107,110 @@ private[spark] object ClosureCleaner extends Logging { } } - def clean(func: AnyRef, checkSerializable: Boolean = true) { + /** + * Clean the given closure in place. + * + * More specifically, this renders the given closure serializable as long as it does not + * explicitly reference unserializable objects. + * + * @param closure the closure to clean + * @param checkSerializable whether to verify that the closure is serializable after cleaning + * @param cleanTransitively whether to clean enclosing closures transitively + */ + def clean( + closure: AnyRef, + checkSerializable: Boolean = true, + cleanTransitively: Boolean = true): Unit = { + clean(closure, checkSerializable, cleanTransitively, Map.empty) + } + + /** + * Helper method to clean the given closure in place. + * + * The mechanism is to traverse the hierarchy of enclosing closures and null out any + * references along the way that are not actually used by the starting closure, but are + * nevertheless included in the compiled anonymous classes. Note that it is unsafe to + * simply mutate the enclosing closures, as other code paths may depend on them. Instead, + * we clone each enclosing closure and set the parent pointers accordingly. + * + * By default, closures are cleaned transitively. This means we detect whether enclosing + * objects are actually referenced by the starting one, either directly or transitively, + * and, if not, sever these closures from the hierarchy. In other words, in addition to + * nulling out unused field references, we also null out any parent pointers that refer + * to enclosing objects not actually needed by the starting closure. + * + * For instance, transitive cleaning is necessary in the following scenario: + * + * class SomethingNotSerializable { + * def someValue = 1 + * def someMethod(): Unit = scope("one") { + * def x = someValue + * def y = 2 + * scope("two") { println(y + 1) } + * } + * def scope(name: String)(body: => Unit) = body + * } + * + * In this example, scope "two" is not serializable because it references scope "one", which + * references SomethingNotSerializable. Note that, however, scope "two" does not actually + * depend on SomethingNotSerializable. This means we can null out the parent pointer of + * a cloned scope "one" and set it the parent of scope "two", such that scope "two" no longer + * references SomethingNotSerializable transitively. + * + * @param func the starting closure to clean + * @param checkSerializable whether to verify that the closure is serializable after cleaning + * @param cleanTransitively whether to clean enclosing closures transitively + * @param accessedFields a map from a class to a set of its fields that are accessed by + * the starting closure + */ + private def clean( + func: AnyRef, + checkSerializable: Boolean, + cleanTransitively: Boolean, + accessedFields: Map[Class[_], Set[String]]) { + + // TODO: clean all inner closures first. This requires us to find the inner objects. // TODO: cache outerClasses / innerClasses / accessedFields - val outerClasses = getOuterClasses(func) + + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}}) +++") + + // A list of classes that represents closures enclosed in the given one val innerClasses = getInnerClasses(func) + + // A list of enclosing objects and their respective classes, from innermost to outermost + // An outer object at a given index is of type outer class at the same index + val outerClasses = getOuterClasses(func) val outerObjects = getOuterObjects(func) - val accessedFields = Map[Class[_], Set[String]]() - + logDebug(s" + inner classes: " + innerClasses.size) + innerClasses.foreach { c => logDebug(" " + c.getName) } + logDebug(s" + outer classes: " + outerClasses.size) + outerClasses.foreach { c => logDebug(" " + c.getName) } + logDebug(s" + outer objects: " + outerObjects.size) + outerObjects.foreach { o => logDebug(" " + o) } + + // Fail fast if we detect return statements in closures getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) - - for (cls <- outerClasses) - accessedFields(cls) = Set[String]() - for (cls <- func.getClass :: innerClasses) - getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0) - // logInfo("accessedFields: " + accessedFields) + + // If accessed fields is not populated yet, we assume that + // the closure we are trying to clean is the starting one + if (accessedFields.isEmpty) { + logDebug(s" + populating accessed fields because this is the starting closure") + // Initialize accessed fields with the outer classes first + // This step is needed to associate the fields to the correct classes later + for (cls <- outerClasses) { + accessedFields(cls) = Set[String]() + } + // Populate accessed fields by visiting all fields and methods accessed by this and + // all of its inner closures. If transitive cleaning is enabled, this may recursively + // visits methods that belong to other classes in search of transitively referenced fields. + for (cls <- func.getClass :: innerClasses) { + getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0) + } + } + + logDebug(s" + fields accessed by starting closure: " + accessedFields.size) + accessedFields.foreach { f => logDebug(" " + f) } val inInterpreter = { try { @@ -126,34 +221,66 @@ private[spark] object ClosureCleaner extends Logging { } } + // List of outer (class, object) pairs, ordered from outermost to innermost + // Note that all outer objects but the outermost one (first one in this list) must be closures var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse - var outer: AnyRef = null + var parent: AnyRef = null if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) { // The closure is ultimately nested inside a class; keep the object of that // class without cloning it since we don't want to clone the user's objects. - outer = outerPairs.head._2 + // Note that we still need to keep around the outermost object itself because + // we need it to clone its child closure later (see below). + logDebug(s" + outermost object is not a closure, so do not clone it: ${outerPairs.head}") + parent = outerPairs.head._2 // e.g. SparkContext outerPairs = outerPairs.tail + } else if (outerPairs.size > 0) { + logDebug(s" + outermost object is a closure, so we just keep it: ${outerPairs.head}") + } else { + logDebug(" + there are no enclosing objects!") } + // Clone the closure objects themselves, nulling out any fields that are not // used in the closure we're working on or any of its inner closures. for ((cls, obj) <- outerPairs) { - outer = instantiateClass(cls, outer, inInterpreter) + logDebug(s" + cloning the object $obj of class ${cls.getName}") + // We null out these unused references by cloning each object and then filling in all + // required fields from the original object. We need the parent here because the Java + // language specification requires the first constructor parameter of any closure to be + // its enclosing object. + val clone = instantiateClass(cls, parent, inInterpreter) for (fieldName <- accessedFields(cls)) { val field = cls.getDeclaredField(fieldName) field.setAccessible(true) val value = field.get(obj) - // logInfo("1: Setting " + fieldName + " on " + cls + " to " + value); - field.set(outer, value) + field.set(clone, value) } + // If transitive cleaning is enabled, we recursively clean any enclosing closure using + // the already populated accessed fields map of the starting closure + if (cleanTransitively && isClosure(clone.getClass)) { + logDebug(s" + cleaning cloned closure $clone recursively (${cls.getName})") + clean(clone, checkSerializable, cleanTransitively, accessedFields) + } + parent = clone } - if (outer != null) { - // logInfo("2: Setting $outer on " + func.getClass + " to " + outer); + // Update the parent pointer ($outer) of this closure + if (parent != null) { val field = func.getClass.getDeclaredField("$outer") field.setAccessible(true) - field.set(func, outer) + // If the starting closure doesn't actually need our enclosing object, then just null it out + if (accessedFields.contains(func.getClass) && + !accessedFields(func.getClass).contains("$outer")) { + logDebug(s" + the starting closure doesn't actually need $parent, so we null it out") + field.set(func, null) + } else { + // Update this closure's parent pointer to point to our enclosing object, + // which could either be a cloned closure or the original user object + field.set(func, parent) + } } - + + logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned +++") + if (checkSerializable) { ensureSerializable(func) } @@ -167,15 +294,17 @@ private[spark] object ClosureCleaner extends Logging { } } - private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = { - // logInfo("Creating a " + cls + " with outer = " + outer) + private def instantiateClass( + cls: Class[_], + enclosingObject: AnyRef, + inInterpreter: Boolean): AnyRef = { if (!inInterpreter) { // This is a bona fide closure class, whose constructor has no effects // other than to set its fields, so use its constructor val cons = cls.getConstructors()(0) val params = cons.getParameterTypes.map(createNullValue).toArray - if (outer != null) { - params(0) = outer // First param is always outer object + if (enclosingObject!= null) { + params(0) = enclosingObject // First param is always enclosing object } return cons.newInstance(params: _*).asInstanceOf[AnyRef] } else { @@ -184,11 +313,10 @@ private[spark] object ClosureCleaner extends Logging { val parentCtor = classOf[java.lang.Object].getDeclaredConstructor() val newCtor = rf.newConstructorForSerialization(cls, parentCtor) val obj = newCtor.newInstance().asInstanceOf[AnyRef] - if (outer != null) { - // logInfo("3: Setting $outer on " + cls + " to " + outer); + if (enclosingObject != null) { val field = cls.getDeclaredField("$outer") field.setAccessible(true) - field.set(obj, outer) + field.set(obj, enclosingObject) } obj } @@ -213,29 +341,68 @@ class ReturnStatementFinder extends ClassVisitor(ASM4) { } } +/** + * Find the fields accessed by a given class. + * + * The fields are stored in the mutable map passed in by the class that contains them. + * This map is assumed to have its keys already populated by the classes of interest. + * + * @param fields the mutable map that stores the fields to return + * @param specificMethodNames if not empty, only visit methods whose names are in this set + * @param findTransitively if true, find fields indirectly referenced in other classes + */ private[spark] -class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) { - override def visitMethod(access: Int, name: String, desc: String, - sig: String, exceptions: Array[String]): MethodVisitor = { +class FieldAccessFinder( + fields: Map[Class[_], Set[String]], + specificMethodNames: Set[String] = Set.empty, + findTransitively: Boolean = true) + extends ClassVisitor(ASM4) { + + override def visitMethod( + access: Int, + name: String, + desc: String, + sig: String, + exceptions: Array[String]): MethodVisitor = { + + // Ignore this method if we don't want to visit it + if (specificMethodNames.nonEmpty && !specificMethodNames.contains(name)) { + return new MethodVisitor(ASM4) { } + } + new MethodVisitor(ASM4) { override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) { if (op == GETFIELD) { - for (cl <- output.keys if cl.getName == owner.replace('/', '.')) { - output(cl) += name + for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) { + fields(cl) += name } } } - override def visitMethodInsn(op: Int, owner: String, name: String, - desc: String) { - // Check for calls a getter method for a variable in an interpreter wrapper object. - // This means that the corresponding field will be accessed, so we should save it. - if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) { - for (cl <- output.keys if cl.getName == owner.replace('/', '.')) { - output(cl) += name + override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { + if (isInvoke(op)) { + for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) { + // Check for calls a getter method for a variable in an interpreter wrapper object. + // This means that the corresponding field will be accessed, so we should save it. + if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) { + fields(cl) += name + } + // Visit other methods to find fields that are transitively referenced + if (findTransitively) { + ClosureCleaner.getClassReader(cl) + .accept(new FieldAccessFinder(fields, Set(name), findTransitively), 0) + } } } } + + private def isInvoke(op: Int): Boolean = { + op == INVOKEVIRTUAL || + op == INVOKESPECIAL || + op == INVOKEDYNAMIC || + op == INVOKEINTERFACE || + op == INVOKESTATIC + } } } } From 2390a608ed74a9703d3763d040421dccb51242ec Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 24 Apr 2015 03:08:11 -0700 Subject: [PATCH 02/23] Feature flag this new behavior ... in case anything breaks, we should be able to resort to old behavior. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- .../test/scala/org/apache/spark/util/ClosureCleanerSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 86269eac52db0..fa5840ec77210 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1740,7 +1740,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * serializable */ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) + val cleanTransitively = conf.getBoolean("spark.closureCleaner.transitive", true) + ClosureCleaner.clean(f, checkSerializable, cleanTransitively) f } diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index c47162779bbba..26961883cb2ec 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -50,7 +50,7 @@ class ClosureCleanerSuite extends FunSuite { val obj = new TestClassWithNesting(1) assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1 } - + test("toplevel return statements in closures are identified at cleaning time") { val ex = intercept[SparkException] { TestObjectWithBogusReturns.run() From 438c68f82902c0b6899a4a8bb54783c1aef8a7dd Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 24 Apr 2015 12:17:09 -0700 Subject: [PATCH 03/23] Minor changes --- .../src/main/scala/org/apache/spark/util/ClosureCleaner.scala | 4 ++-- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 18c4e4b87cc83..1ebeab87aedba 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -303,7 +303,7 @@ private[spark] object ClosureCleaner extends Logging { // other than to set its fields, so use its constructor val cons = cls.getConstructors()(0) val params = cons.getParameterTypes.map(createNullValue).toArray - if (enclosingObject!= null) { + if (enclosingObject != null) { params(0) = enclosingObject // First param is always enclosing object } return cons.newInstance(params: _*).asInstanceOf[AnyRef] @@ -345,7 +345,7 @@ class ReturnStatementFinder extends ClassVisitor(ASM4) { * Find the fields accessed by a given class. * * The fields are stored in the mutable map passed in by the class that contains them. - * This map is assumed to have its keys already populated by the classes of interest. + * This map is assumed to have its keys already populated with the classes of interest. * * @param fields the mutable map that stores the fields to return * @param specificMethodNames if not empty, only visit methods whose names are in this set diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 26961883cb2ec..e38a31fcc0c3d 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -61,7 +61,7 @@ class ClosureCleanerSuite extends FunSuite { test("return statements from named functions nested in closures don't raise exceptions") { val result = TestObjectWithNestedReturns.run() - assert(result == 1) + assert(result === 1) } } From a4866e3387ff5341280753909e0e1ed9a66502f2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 24 Apr 2015 16:18:18 -0700 Subject: [PATCH 04/23] Add tests (still WIP) The existing ones are not passing yet because cleaning closures is not idempotent. This will be added in a future commit. --- .../apache/spark/util/ClosureCleaner.scala | 17 +- .../spark/util/ClosureCleanerSuite.scala | 9 +- .../spark/util/ClosureCleanerSuite2.scala | 291 ++++++++++++++++++ 3 files changed, 312 insertions(+), 5 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 1ebeab87aedba..b4e0d819e6149 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException} /** * A cleaner that renders closures serializable if they can be done so safely. */ -private[spark] object ClosureCleaner extends Logging { +object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it def getClassReader(cls: Class[_]): ClassReader = { @@ -182,11 +182,19 @@ private[spark] object ClosureCleaner extends Logging { val outerClasses = getOuterClasses(func) val outerObjects = getOuterObjects(func) - logDebug(s" + inner classes: " + innerClasses.size) + // For logging purposes only + val declaredFields = func.getClass.getDeclaredFields + val declaredMethods = func.getClass.getDeclaredMethods + + logDebug(" + declared fields: " + declaredFields.size) + declaredFields.foreach { f => logDebug(" " + f) } + logDebug(" + declared methods: " + declaredMethods.size) + declaredMethods.foreach { m => logDebug(" " + m) } + logDebug(" + inner classes: " + innerClasses.size) innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(s" + outer classes: " + outerClasses.size) + logDebug(" + outer classes: " + outerClasses.size) outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(s" + outer objects: " + outerObjects.size) + logDebug(" + outer objects: " + outerObjects.size) outerObjects.foreach { o => logDebug(" " + o) } // Fail fast if we detect return statements in closures @@ -388,6 +396,7 @@ class FieldAccessFinder( fields(cl) += name } // Visit other methods to find fields that are transitively referenced + // FIXME: This could lead to infinite cycles!! if (findTransitively) { ClosureCleaner.getClassReader(cl) .accept(new FieldAccessFinder(fields, Set(name), findTransitively), 0) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index e38a31fcc0c3d..ff1bfe0774a2f 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -67,7 +67,14 @@ class ClosureCleanerSuite extends FunSuite { // A non-serializable class we create in closures to make sure that we aren't // keeping references to unneeded variables from our outer closures. -class NonSerializable {} +class NonSerializable(val id: Int = -1) { + override def equals(other: Any): Boolean = { + other match { + case o: NonSerializable => id == o.id + case _ => false + } + } +} object TestObject { def run(): Int = { diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala new file mode 100644 index 0000000000000..d498731f451d3 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.NotSerializableException + +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.serializer.SerializerInstance + +// TODO: REMOVE ME +import java.util.Properties +import org.apache.log4j.PropertyConfigurator + +/** + * Another test suite for the closure cleaner that is finer-grained. + * For tests involving end-to-end Spark jobs, see {{ClosureCleanerSuite}}. + */ +class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { + + // Start a SparkContext so that SparkEnv.get.closureSerializer is accessible + // We do not actually use this explicitly except to stop it later + private var sc: SparkContext = null + private var closureSerializer: SerializerInstance = null + + override def beforeAll(): Unit = { + sc = new SparkContext("local", "test") + closureSerializer = sc.env.closureSerializer.newInstance() + } + + override def afterAll(): Unit = { + sc.stop() + sc = null + closureSerializer = null + } + + // Some fields and methods that belong to this class, which is itself not serializable + private val someSerializableValue = 1 + private val someNonSerializableValue = new NonSerializable + private def someSerializableMethod() = 1 + private def someNonSerializableMethod() = new NonSerializable + + private def assertSerializable(closure: AnyRef, serializable: Boolean): Unit = { + if (serializable) { + closureSerializer.serialize(closure) + } else { + intercept[NotSerializableException] { + closureSerializer.serialize(closure) + } + } + } + + private def testClean( + closure: AnyRef, + serializableBefore: Boolean, + serializableAfter: Boolean): Unit = { + testClean(closure, serializableBefore, serializableAfter, transitive = true) + testClean(closure, serializableBefore, serializableAfter, transitive = false) + } + + private def testClean( + closure: AnyRef, + serializableBefore: Boolean, + serializableAfter: Boolean, + transitive: Boolean): Unit = { + assertSerializable(closure, serializableBefore) + // If the resulting closure is not serializable even after + // cleaning, we expect ClosureCleaner to throw a SparkException + intercept[SparkException] { + ClosureCleaner.clean(closure, checkSerializable = true, transitive) + // Otherwise, if we do expect the closure to be serializable after the + // clean, throw the SparkException ourselves so scalatest is happy + if (serializableAfter) { throw new SparkException("no-op") } + } + assertSerializable(closure, serializableAfter) + } + + test("clean basic serializable closures") { + val localSerializableVal = someSerializableValue + val closure1 = () => 1 + val closure2 = () => Array[String]("a", "b", "c") + val closure3 = (s: String, arr: Array[Long]) => s + arr.mkString(", ") + val closure4 = () => localSerializableVal + val closure5 = () => new NonSerializable(5) // we're just serializing the class information + val closure1r = closure1() + val closure2r = closure2() + val closure3r = closure3("g", Array(1, 5, 8)) + val closure4r = closure4() + val closure5r = closure5() + + testClean(closure1, serializableBefore = true, serializableAfter = true) + testClean(closure2, serializableBefore = true, serializableAfter = true) + testClean(closure3, serializableBefore = true, serializableAfter = true) + testClean(closure4, serializableBefore = true, serializableAfter = true) + testClean(closure5, serializableBefore = true, serializableAfter = true) + + // Verify that closures can still be invoked and the result still the same + assert(closure1() === closure1r) + assert(closure2() === closure2r) + assert(closure3("g", Array(1, 5, 8)) === closure3r) + assert(closure4() === closure4r) + assert(closure5() === closure5r) + } + + test("clean basic non-serializable closures") { + val closure1 = () => this // ClosureCleanerSuite2 is not serializable + val closure5 = () => someSerializableValue + val closure3 = () => someSerializableMethod() + val closure4 = () => someNonSerializableValue + val closure2 = () => someNonSerializableMethod() + + // These are not cleanable because they ultimately reference the `this` pointer + testClean(closure1, serializableBefore = false, serializableAfter = false) + testClean(closure2, serializableBefore = false, serializableAfter = false) + testClean(closure3, serializableBefore = false, serializableAfter = false) + testClean(closure4, serializableBefore = false, serializableAfter = false) + testClean(closure5, serializableBefore = false, serializableAfter = false) + } + + test("clean basic nested serializable closures") { + val localSerializableValue = someSerializableValue + val closure1 = (i: Int) => { + (1 to i).map { x => x + localSerializableValue } // 1 level of nesting + } + val closure2 = (j: Int) => { + (1 to j).flatMap { x => + (1 to x).map { y => y + localSerializableValue } // 2 levels + } + } + val closure3 = (k: Int, l: Int, m: Int) => { + (1 to k).flatMap(closure2) ++ // 4 levels + (1 to l).flatMap(closure1) ++ // 3 levels + (1 to m).map { x => x + 1 } // 2 levels + } + val closure1r = closure1(1) + val closure2r = closure2(2) + val closure3r = closure3(3, 4, 5) + + testClean(closure1, serializableBefore = true, serializableAfter = true) + testClean(closure2, serializableBefore = true, serializableAfter = true) + testClean(closure3, serializableBefore = true, serializableAfter = true) + + assert(closure1(1) === closure1r) + assert(closure2(2) === closure2r) + assert(closure3(3, 4, 5) === closure3r) + } + + test("clean basic nested non-serializable closures") { + def localSerializableMethod() = someSerializableValue + val localNonSerializableValue = someNonSerializableValue + val closure1 = (i: Int) => { (1 to i).map { x => x + someSerializableValue } } + val closure2 = (j: Int) => { (1 to j).map { x => x + someSerializableMethod() } } + val closure4 = (k: Int) => { (1 to k).map { x => x + localSerializableMethod() } } + val closure3 = (l: Int) => { (1 to l).map { x => localNonSerializableValue } } + // This is non-serializable no matter how many levels we nest it + val closure5 = (m: Int) => { + (1 to m).foreach { x => + (1 to x).foreach { y => + (1 to y).foreach { z => + someSerializableValue + } + } + } + } + + testClean(closure1, serializableBefore = false, serializableAfter = false) + testClean(closure2, serializableBefore = false, serializableAfter = false) + testClean(closure3, serializableBefore = false, serializableAfter = false) + testClean(closure4, serializableBefore = false, serializableAfter = false) + testClean(closure5, serializableBefore = false, serializableAfter = false) + } + + test("clean complicated nested serializable closures") { + val localSerializableValue = someSerializableValue + + // Reference local fields from all levels + val closure1 = (i: Int) => { + val a = 1 + (1 to i).flatMap { x => + val b = a + 1 + (1 to x).map { y => + y + a + b + localSerializableValue + } + } + } + + // Reference local fields and methods from all levels within the outermost closure + val closure2 = (i: Int) => { + val a1 = 1 + def a2 = 2 + (1 to i).flatMap { x => + val b1 = a1 + 1 + def b2 = a2 + 1 + (1 to x).map { y => + // If this references a method outside the outermost closure, then it will try to pull + // in the ClosureCleanerSuite2. This is why `localSerializableValue` here must be a val. + y + a1 + a2 + b1 + b2 + localSerializableValue + } + } + } + + val closure1r = closure1(1) + val closure2r = closure2(2) + testClean(closure1, serializableBefore = true, serializableAfter = true) + testClean(closure2, serializableBefore = true, serializableAfter = true) + assert(closure1(1) == closure1r) + assert(closure2(2) == closure2r) + } + + test("clean complicated nested non-serializable closures") { + val localSerializableValue = someSerializableValue + + // Note that we are not interested in cleaning the outer closures here + // The only reason why they exist is to nest the inner closures + + val test1 = () => { + val a = localSerializableValue + val b = sc + val inner1 = (x: Int) => x + a + b.hashCode() + val inner2 = (x: Int) => x + a + + // This closure explicitly references a non-serializable field + // There is no way to clean it + testClean(inner1, serializableBefore = false, serializableAfter = false) + + // This closure is serializable to begin with since + // it does not have a pointer to the outer closure + testClean(inner2, serializableBefore = true, serializableAfter = true) + } + + // Same as above, but the `val a` becomes `def a` + // The difference here is that all inner closures now have pointers to the outer closure + val test2 = () => { + def a = localSerializableValue + val b = sc + val inner1 = (x: Int) => x + a + b.hashCode() + val inner2 = (x: Int) => x + a + + // As before, this closure is neither serializable nor cleanable + testClean(inner1, serializableBefore = false, serializableAfter = false) + + // This closure is no longer serializable because it now has a pointer to the outer closure, + // which is itself not serializable because it has a pointer to the ClosureCleanerSuite. + // If we do not clean transitively, we will not null out this parent pointer. + testClean(inner2, serializableBefore = false, serializableAfter = false, transitive = false) + + // If we clean transitively, we will find that method `a` does not actually reference the + // outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out + // the outer closure's parent pointer. This will make `inner2` serializable. + testClean(inner2, serializableBefore = false, serializableAfter = true, transitive = true) + } + + test1() + test2() + } + + + + + + // TODO: REMOVE ME + configureLog4j() + private def configureLog4j(): Unit = { + val pro = new Properties() + pro.put("log4j.rootLogger", "WARN, console") + pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender") + pro.put("log4j.appender.console.target", "System.err") + pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout") + pro.put("log4j.appender.console.layout.ConversionPattern", + "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n") + pro.put("log4j.logger.org.apache.spark.util.ClosureCleaner", "DEBUG") + PropertyConfigurator.configure(pro) + } + +} From 06fd668eeec6ff773db8cf9e38c66937abf8ca5a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 24 Apr 2015 16:29:09 -0700 Subject: [PATCH 05/23] Make closure cleaning idempotent We need this for tests because we clean the same closure many times there. Outside of tests this is probably not important. --- .../apache/spark/util/ClosureCleaner.scala | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index b4e0d819e6149..0dfa1444e703b 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -58,10 +58,13 @@ object ClosureCleaner extends Logging { private def getOuterClasses(obj: AnyRef): List[Class[_]] = { for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") { f.setAccessible(true) - if (isClosure(f.getType)) { - return f.getType :: getOuterClasses(f.get(obj)) - } else { - return f.getType :: Nil // Stop at the first $outer that is not a closure + val outer = f.get(obj) + if (outer != null) { + if (isClosure(f.getType)) { + return f.getType :: getOuterClasses(f.get(obj)) + } else { + return f.getType :: Nil // Stop at the first $outer that is not a closure + } } } Nil @@ -71,10 +74,13 @@ object ClosureCleaner extends Logging { private def getOuterObjects(obj: AnyRef): List[AnyRef] = { for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") { f.setAccessible(true) - if (isClosure(f.getType)) { - return f.get(obj) :: getOuterObjects(f.get(obj)) - } else { - return f.get(obj) :: Nil // Stop at the first $outer that is not a closure + val outer = f.get(obj) + if (outer != null) { + if (isClosure(f.getType)) { + return f.get(obj) :: getOuterObjects(f.get(obj)) + } else { + return f.get(obj) :: Nil // Stop at the first $outer that is not a closure + } } } Nil @@ -167,11 +173,15 @@ object ClosureCleaner extends Logging { func: AnyRef, checkSerializable: Boolean, cleanTransitively: Boolean, - accessedFields: Map[Class[_], Set[String]]) { + accessedFields: Map[Class[_], Set[String]]): Unit = { // TODO: clean all inner closures first. This requires us to find the inner objects. // TODO: cache outerClasses / innerClasses / accessedFields + if (func == null) { + return + } + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}}) +++") // A list of classes that represents closures enclosed in the given one From 263593ddc9224774b7af76ceb7364d2ee82aef2c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 24 Apr 2015 16:58:13 -0700 Subject: [PATCH 06/23] Finalize tests --- .../spark/util/ClosureCleanerSuite2.scala | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index d498731f451d3..fb65efa9f6013 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -24,10 +24,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.serializer.SerializerInstance -// TODO: REMOVE ME -import java.util.Properties -import org.apache.log4j.PropertyConfigurator - /** * Another test suite for the closure cleaner that is finer-grained. * For tests involving end-to-end Spark jobs, see {{ClosureCleanerSuite}}. @@ -56,6 +52,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { private def someSerializableMethod() = 1 private def someNonSerializableMethod() = new NonSerializable + /** Assert that the given closure is serializable (or not). */ private def assertSerializable(closure: AnyRef, serializable: Boolean): Unit = { if (serializable) { closureSerializer.serialize(closure) @@ -66,6 +63,10 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { } } + /** + * Helper method for testing whether closure cleaning works as expected. + * This cleans the given closure twice, with and without transitive cleaning. + */ private def testClean( closure: AnyRef, serializableBefore: Boolean, @@ -74,6 +75,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { testClean(closure, serializableBefore, serializableAfter, transitive = false) } + /** Helper method for testing whether closure cleaning works as expected. */ private def testClean( closure: AnyRef, serializableBefore: Boolean, @@ -266,26 +268,18 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { testClean(inner2, serializableBefore = false, serializableAfter = true, transitive = true) } + // Same as above, but with more levels of nesting + val test3 = () => { () => test1() } + val test4 = () => { () => test2() } + val test5 = () => { () => { () => test3() } } + val test6 = () => { () => { () => test4() } } + test1() test2() - } - - - - - - // TODO: REMOVE ME - configureLog4j() - private def configureLog4j(): Unit = { - val pro = new Properties() - pro.put("log4j.rootLogger", "WARN, console") - pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender") - pro.put("log4j.appender.console.target", "System.err") - pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout") - pro.put("log4j.appender.console.layout.ConversionPattern", - "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n") - pro.put("log4j.logger.org.apache.spark.util.ClosureCleaner", "DEBUG") - PropertyConfigurator.configure(pro) + test3()() + test4()() + test5()()() + test6()()() } } From 6d36f385a7783aea22152b9937cb685081a7c020 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 24 Apr 2015 17:41:27 -0700 Subject: [PATCH 07/23] Fix closure cleaner visibility --- .../scala/org/apache/spark/util/ClosureCleaner.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 0dfa1444e703b..916ec25b9deab 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException} /** * A cleaner that renders closures serializable if they can be done so safely. */ -object ClosureCleaner extends Logging { +private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it def getClassReader(cls: Class[_]): ClassReader = { @@ -341,8 +341,7 @@ object ClosureCleaner extends Logging { } } -private[spark] -class ReturnStatementFinder extends ClassVisitor(ASM4) { +private class ReturnStatementFinder extends ClassVisitor(ASM4) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { if (name.contains("apply")) { @@ -369,8 +368,7 @@ class ReturnStatementFinder extends ClassVisitor(ASM4) { * @param specificMethodNames if not empty, only visit methods whose names are in this set * @param findTransitively if true, find fields indirectly referenced in other classes */ -private[spark] -class FieldAccessFinder( +private class FieldAccessFinder( fields: Map[Class[_], Set[String]], specificMethodNames: Set[String] = Set.empty, findTransitively: Boolean = true) @@ -426,7 +424,7 @@ class FieldAccessFinder( } } -private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) { +private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) { var myName: String = null override def visit(version: Int, access: Int, name: String, sig: String, From e6721706ac5e82b638062c3ae9f6dc35bf4e7e2d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 24 Apr 2015 19:31:08 -0700 Subject: [PATCH 08/23] Guard against potential infinite cycles in method visitor Now we keep track of the methods that we visited to avoid visiting the same method twice. --- .../apache/spark/util/ClosureCleaner.scala | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 916ec25b9deab..6a9300cd8960b 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -223,7 +223,7 @@ private[spark] object ClosureCleaner extends Logging { // all of its inner closures. If transitive cleaning is enabled, this may recursively // visits methods that belong to other classes in search of transitively referenced fields. for (cls <- func.getClass :: innerClasses) { - getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0) + getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0) } } @@ -358,6 +358,9 @@ private class ReturnStatementFinder extends ClassVisitor(ASM4) { } } +/** Helper class to identify a method. */ +private case class MethodIdentifier(cls: Class[_], name: String, desc: String) + /** * Find the fields accessed by a given class. * @@ -365,13 +368,15 @@ private class ReturnStatementFinder extends ClassVisitor(ASM4) { * This map is assumed to have its keys already populated with the classes of interest. * * @param fields the mutable map that stores the fields to return - * @param specificMethodNames if not empty, only visit methods whose names are in this set * @param findTransitively if true, find fields indirectly referenced in other classes + * @param specificMethod if not empty, visit only this method + * @param visitedMethods a list of visited methods to avoid cycles */ private class FieldAccessFinder( fields: Map[Class[_], Set[String]], - specificMethodNames: Set[String] = Set.empty, - findTransitively: Boolean = true) + findTransitively: Boolean, + specificMethod: Option[MethodIdentifier] = None, + visitedMethods: Set[MethodIdentifier] = Set.empty) extends ClassVisitor(ASM4) { override def visitMethod( @@ -381,9 +386,10 @@ private class FieldAccessFinder( sig: String, exceptions: Array[String]): MethodVisitor = { - // Ignore this method if we don't want to visit it - if (specificMethodNames.nonEmpty && !specificMethodNames.contains(name)) { - return new MethodVisitor(ASM4) { } + // Ignore this method unless we are told to visit it + if (specificMethod.nonEmpty && + specificMethod.get.name != name || specificMethod.get.desc != desc) { + return null } new MethodVisitor(ASM4) { @@ -396,30 +402,24 @@ private class FieldAccessFinder( } override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { - if (isInvoke(op)) { - for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) { - // Check for calls a getter method for a variable in an interpreter wrapper object. - // This means that the corresponding field will be accessed, so we should save it. - if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) { - fields(cl) += name - } - // Visit other methods to find fields that are transitively referenced - // FIXME: This could lead to infinite cycles!! - if (findTransitively) { - ClosureCleaner.getClassReader(cl) - .accept(new FieldAccessFinder(fields, Set(name), findTransitively), 0) + for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) { + // Check for calls a getter method for a variable in an interpreter wrapper object. + // This means that the corresponding field will be accessed, so we should save it. + if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) { + fields(cl) += name + } + // Visit other methods to find fields that are transitively referenced + if (findTransitively) { + val m = MethodIdentifier(cl, name, desc) + if (!visitedMethods.contains(m)) { + // Keep track of visited methods to avoid potential infinite cycles + visitedMethods += m + ClosureCleaner.getClassReader(cl).accept( + new FieldAccessFinder(fields, findTransitively, Some(m), visitedMethods), 0) } } } } - - private def isInvoke(op: Int): Boolean = { - op == INVOKEVIRTUAL || - op == INVOKESPECIAL || - op == INVOKEDYNAMIC || - op == INVOKEINTERFACE || - op == INVOKESTATIC - } } } } From a3aa465e35753e0ee9b70f97bb1f41fc61b0f5aa Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 25 Apr 2015 01:57:48 -0700 Subject: [PATCH 09/23] Add more tests for individual closure cleaner operations --- .../apache/spark/util/ClosureCleaner.scala | 18 +- .../spark/util/ClosureCleanerSuite2.scala | 246 ++++++++++++++++++ 2 files changed, 256 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 6a9300cd8960b..decf25d4be7fb 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -32,7 +32,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException} private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it - def getClassReader(cls: Class[_]): ClassReader = { + private[util] def getClassReader(cls: Class[_]): ClassReader = { // Copy data over, before delegating to ClassReader - else we can run out of open file handles. val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" val resourceStream = cls.getResourceAsStream(className) @@ -45,7 +45,7 @@ private[spark] object ClosureCleaner extends Logging { } // Check whether a class represents a Scala closure - private def isClosure(cls: Class[_]): Boolean = { + private[util] def isClosure(cls: Class[_]): Boolean = { cls.getName.contains("$anonfun$") } @@ -55,10 +55,11 @@ private[spark] object ClosureCleaner extends Logging { // for outer objects beyond that because cloning the user's object is probably // not a good idea (whereas we can clone closure objects just fine since we // understand how all their fields are used). - private def getOuterClasses(obj: AnyRef): List[Class[_]] = { + private[util] def getOuterClasses(obj: AnyRef): List[Class[_]] = { for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") { f.setAccessible(true) val outer = f.get(obj) + // The outer pointer may be null if we have cleaned this closure before if (outer != null) { if (isClosure(f.getType)) { return f.getType :: getOuterClasses(f.get(obj)) @@ -71,10 +72,11 @@ private[spark] object ClosureCleaner extends Logging { } // Get a list of the outer objects for a given closure object. - private def getOuterObjects(obj: AnyRef): List[AnyRef] = { + private[util] def getOuterObjects(obj: AnyRef): List[AnyRef] = { for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") { f.setAccessible(true) val outer = f.get(obj) + // The outer pointer may be null if we have cleaned this closure before if (outer != null) { if (isClosure(f.getType)) { return f.get(obj) :: getOuterObjects(f.get(obj)) @@ -89,7 +91,7 @@ private[spark] object ClosureCleaner extends Logging { /** * Return a list of classes that represent closures enclosed in the given closure object. */ - private def getInnerClasses(obj: AnyRef): List[Class[_]] = { + private[util] def getInnerClasses(obj: AnyRef): List[Class[_]] = { val seen = Set[Class[_]](obj.getClass) var stack = List[Class[_]](obj.getClass) while (!stack.isEmpty) { @@ -372,7 +374,7 @@ private case class MethodIdentifier(cls: Class[_], name: String, desc: String) * @param specificMethod if not empty, visit only this method * @param visitedMethods a list of visited methods to avoid cycles */ -private class FieldAccessFinder( +private[util] class FieldAccessFinder( fields: Map[Class[_], Set[String]], findTransitively: Boolean, specificMethod: Option[MethodIdentifier] = None, @@ -387,8 +389,8 @@ private class FieldAccessFinder( exceptions: Array[String]): MethodVisitor = { // Ignore this method unless we are told to visit it - if (specificMethod.nonEmpty && - specificMethod.get.name != name || specificMethod.get.desc != desc) { + if (specificMethod.isDefined && + (specificMethod.get.name != name || specificMethod.get.desc != desc)) { return null } diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index fb65efa9f6013..13995db5e2064 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -19,6 +19,8 @@ package org.apache.spark.util import java.io.NotSerializableException +import scala.collection.mutable + import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark.{SparkContext, SparkException} @@ -93,6 +95,250 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { assertSerializable(closure, serializableAfter) } + /** + * Return the fields accessed by the given closure by class. + * This also optionally finds the fields transitively referenced through methods + * that belong to other classes. + */ + private def findAccessedFields( + closure: AnyRef, + outerClasses: Seq[Class[_]], + findTransitively: Boolean): Map[Class[_], Set[String]] = { + val fields = new mutable.HashMap[Class[_], mutable.Set[String]] + outerClasses.foreach { c => fields(c) = new mutable.HashSet[String] } + ClosureCleaner.getClassReader(closure.getClass) + .accept(new FieldAccessFinder(fields, findTransitively), 0) + fields.mapValues(_.toSet).toMap + } + + test("get inner classes") { + val closure1 = () => 1 + val closure2 = () => { () => 1 } + val closure3 = (i: Int) => { + (1 to i).map { x => x + 1 }.filter { x => x > 5 } + } + val closure4 = (j: Int) => { + (1 to j).flatMap { x => + (1 to x).flatMap { y => + (1 to y).map { z => z + 1 } + } + } + } + val inner1 = ClosureCleaner.getInnerClasses(closure1) + val inner2 = ClosureCleaner.getInnerClasses(closure2) + val inner3 = ClosureCleaner.getInnerClasses(closure3) + val inner4 = ClosureCleaner.getInnerClasses(closure4) + assert(inner1.isEmpty) + assert(inner2.size === 1) + assert(inner3.size === 2) + assert(inner4.size === 3) + assert(inner2.forall(ClosureCleaner.isClosure)) + assert(inner3.forall(ClosureCleaner.isClosure)) + assert(inner4.forall(ClosureCleaner.isClosure)) + } + + test("get outer classes and objects") { + val localValue = someSerializableValue + val closure1 = () => 1 + val closure2 = () => localValue + val closure3 = () => someSerializableValue + val closure4 = () => someSerializableMethod() + val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) + val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) + val outerClasses3 = ClosureCleaner.getOuterClasses(closure3) + val outerClasses4 = ClosureCleaner.getOuterClasses(closure4) + val outerObjects1 = ClosureCleaner.getOuterObjects(closure1) + val outerObjects2 = ClosureCleaner.getOuterObjects(closure2) + val outerObjects3 = ClosureCleaner.getOuterObjects(closure3) + val outerObjects4 = ClosureCleaner.getOuterObjects(closure4) + + // The classes and objects should have the same size + assert(outerClasses1.size === outerObjects1.size) + assert(outerClasses2.size === outerObjects2.size) + assert(outerClasses3.size === outerObjects3.size) + assert(outerClasses4.size === outerObjects4.size) + + // These do not have $outer pointers because they reference only local variables + assert(outerClasses1.isEmpty) + assert(outerClasses2.isEmpty) + + // These closures do have $outer pointers because they ultimately reference `this` + // The first $outer pointer refers to the closure defines this test (see FunSuite#test) + // The second $outer pointer refers to ClosureCleanerSuite2 + assert(outerClasses3.size === 2) + assert(outerClasses4.size === 2) + assert(ClosureCleaner.isClosure(outerClasses3(0))) + assert(ClosureCleaner.isClosure(outerClasses4(0))) + assert(outerClasses3(0) === outerClasses4(0)) // part of the same "FunSuite#test" scope + assert(outerClasses3(1) === this.getClass) + assert(outerClasses4(1) === this.getClass) + assert(outerObjects3(1) === this) + assert(outerObjects4(1) === this) + } + + test("get outer classes and objects with nesting") { + val localValue = someSerializableValue + + val test1 = () => { + val x = 1 + val closure1 = () => 1 + val closure2 = () => x + val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) + val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) + val outerObjects1 = ClosureCleaner.getOuterObjects(closure1) + val outerObjects2 = ClosureCleaner.getOuterObjects(closure2) + assert(outerClasses1.size === outerObjects1.size) + assert(outerClasses2.size === outerObjects2.size) + // These inner closures only reference local variables, and so do not have $outer pointer + assert(outerClasses1.isEmpty) + assert(outerClasses2.isEmpty) + } + + val test2 = () => { + def y = 1 + val closure1 = () => 1 + val closure2 = () => y + val closure3 = () => localValue + val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) + val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) + val outerClasses3 = ClosureCleaner.getOuterClasses(closure3) + val outerObjects1 = ClosureCleaner.getOuterObjects(closure1) + val outerObjects2 = ClosureCleaner.getOuterObjects(closure2) + val outerObjects3 = ClosureCleaner.getOuterObjects(closure3) + assert(outerClasses1.size === outerObjects1.size) + assert(outerClasses2.size === outerObjects2.size) + assert(outerClasses3.size === outerObjects3.size) + // Same as above, this closure only references local variables + assert(outerClasses1.isEmpty) + // This closure references the "test2" scope because it needs to find the method `y` + // Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2 + assert(outerClasses2.size === 3) + // This closure references the "test2" scope because it needs to find the + // `localValue` defined outside of this scope + assert(outerClasses3.size === 3) + assert(ClosureCleaner.isClosure(outerClasses2(0))) + assert(ClosureCleaner.isClosure(outerClasses3(0))) + assert(ClosureCleaner.isClosure(outerClasses2(1))) + assert(ClosureCleaner.isClosure(outerClasses3(1))) + assert(outerClasses2(0) === outerClasses3(0)) // part of the same "test2" scope + assert(outerClasses2(1) === outerClasses3(1)) // part of the same "FunSuite#test" scope + assert(outerClasses2(2) === this.getClass) + assert(outerClasses3(2) === this.getClass) + assert(outerObjects2(2) === this) + assert(outerObjects3(2) === this) + } + + test1() + test2() + } + + test("find accessed fields") { + val localValue = someSerializableValue + val closure1 = () => 1 + val closure2 = () => localValue + val closure3 = () => someSerializableValue + val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) + val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) + val outerClasses3 = ClosureCleaner.getOuterClasses(closure3) + + val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false) + val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false) + val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false) + assert(fields1.isEmpty) + assert(fields2.isEmpty) + assert(fields3.size === 2) + // This corresponds to the "FunSuite#test" closure. This is empty because the + // field `closure3` references belongs to its parent (i.e. ClosureCleanerSuite2) + assert(fields3(outerClasses3(0)).isEmpty) + // This corresponds to the ClosureCleanerSuite2. This is also empty, however, + // because we did not find fields transitively (i.e. beyond 1 enclosing scope) + assert(fields3(outerClasses3(1)).isEmpty) + + val fields1t = findAccessedFields(closure1, outerClasses1, findTransitively = true) + val fields2t = findAccessedFields(closure2, outerClasses2, findTransitively = true) + val fields3t = findAccessedFields(closure3, outerClasses3, findTransitively = true) + assert(fields1t.isEmpty) + assert(fields2t.isEmpty) + assert(fields3t.size === 2) + // Because we find fields transitively now, we are able to detect that we need the + // $outer pointer to get the field from the ClosureCleanerSuite2. + assert(fields3t(outerClasses3(0)).size === 1) + assert(fields3t(outerClasses3(0)).head === "$outer") + assert(fields3t(outerClasses3(1)).size === 1) + assert(fields3t(outerClasses3(1)).head.contains("someSerializableValue")) + } + + test("find accessed fields with nesting") { + val localValue = someSerializableValue + + val test1 = () => { + def a = localValue + 1 + val closure1 = () => 1 + val closure2 = () => a + val closure3 = () => localValue + val closure4 = () => someSerializableValue + val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) + val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) + val outerClasses3 = ClosureCleaner.getOuterClasses(closure3) + val outerClasses4 = ClosureCleaner.getOuterClasses(closure4) + + // First, find only fields the closures directly access + val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false) + val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false) + val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false) + val fields4 = findAccessedFields(closure4, outerClasses4, findTransitively = false) + assert(fields1.isEmpty) + // "test1" < "FunSuite#test" < ClosureCleanerSuite2 + assert(fields2.size === 3) + assert(fields2(outerClasses2(0)).isEmpty) // `def a` is not a field + assert(fields2(outerClasses2(1)).isEmpty) + assert(fields2(outerClasses2(2)).isEmpty) + assert(fields3.size === 3) + // Note that `localValue` is a field of the "test1" closure because `def a` needs it + // Further note that it is NOT a field of the "FunSuite#test" closure but a local variable + assert(fields3(outerClasses3(0)).size === 1) + assert(fields3(outerClasses3(0)).head.contains("localValue")) + assert(fields3(outerClasses3(1)).isEmpty) + assert(fields3(outerClasses3(2)).isEmpty) + assert(fields4.size === 3) + assert(fields4(outerClasses4(0)).isEmpty) + assert(fields4(outerClasses4(1)).isEmpty) + // Because `someSerializableValue` is a val, even an explicit reference here actually + // involves a method call to access the underlying value of the variable. Because we are + // not finding fields transitively here, we do not consider the fields accessed by this + // "method" (i.e. the val's accessor). + assert(fields4(outerClasses4(2)).isEmpty) + + // Now do the same, but find fields that the closures transitively reference + val fields1t = findAccessedFields(closure1, outerClasses1, findTransitively = true) + val fields2t = findAccessedFields(closure2, outerClasses2, findTransitively = true) + val fields3t = findAccessedFields(closure3, outerClasses3, findTransitively = true) + val fields4t = findAccessedFields(closure4, outerClasses4, findTransitively = true) + assert(fields1t.isEmpty) + assert(fields2t.size === 3) + // This closure transitively references `localValue` because `def a` uses it + assert(fields2t(outerClasses2(0)).size === 1) + assert(fields2t(outerClasses2(0)).head.contains("localValue")) + assert(fields2t(outerClasses2(1)).isEmpty) + assert(fields2t(outerClasses2(2)).isEmpty) + assert(fields3t.size === 3) + assert(fields3t(outerClasses3(0)).size === 1) // as before + assert(fields3t(outerClasses3(0)).head.contains("localValue")) + assert(fields3t(outerClasses3(1)).isEmpty) + assert(fields3t(outerClasses3(2)).isEmpty) + assert(fields4t.size === 3) + // Through a series of method calls, we are able to detect that we ultimately access + // ClosureCleanerSuite2's field `someSerializableValue`. Along the way, we also accessed + // a few $outer parent pointers to get to the outermost object. + assert(fields4t(outerClasses4(0)) === Set("$outer")) + assert(fields4t(outerClasses4(1)) === Set("$outer")) + assert(fields4t(outerClasses4(2)).size === 1) + assert(fields4t(outerClasses4(2)).head.contains("someSerializableValue")) + } + + test1() + } + test("clean basic serializable closures") { val localSerializableVal = someSerializableValue val closure1 = () => 1 From eb127e54fa04ef17523806067d074b8560ea783e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 25 Apr 2015 02:20:47 -0700 Subject: [PATCH 10/23] Use private method tester for a few things --- .../apache/spark/util/ClosureCleaner.scala | 8 +- .../spark/util/ClosureCleanerSuite2.scala | 102 +++++++++++------- 2 files changed, 66 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index decf25d4be7fb..946f17a1c5cc8 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -45,7 +45,7 @@ private[spark] object ClosureCleaner extends Logging { } // Check whether a class represents a Scala closure - private[util] def isClosure(cls: Class[_]): Boolean = { + private def isClosure(cls: Class[_]): Boolean = { cls.getName.contains("$anonfun$") } @@ -55,7 +55,7 @@ private[spark] object ClosureCleaner extends Logging { // for outer objects beyond that because cloning the user's object is probably // not a good idea (whereas we can clone closure objects just fine since we // understand how all their fields are used). - private[util] def getOuterClasses(obj: AnyRef): List[Class[_]] = { + private def getOuterClasses(obj: AnyRef): List[Class[_]] = { for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") { f.setAccessible(true) val outer = f.get(obj) @@ -72,7 +72,7 @@ private[spark] object ClosureCleaner extends Logging { } // Get a list of the outer objects for a given closure object. - private[util] def getOuterObjects(obj: AnyRef): List[AnyRef] = { + private def getOuterObjects(obj: AnyRef): List[AnyRef] = { for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") { f.setAccessible(true) val outer = f.get(obj) @@ -91,7 +91,7 @@ private[spark] object ClosureCleaner extends Logging { /** * Return a list of classes that represent closures enclosed in the given closure object. */ - private[util] def getInnerClasses(obj: AnyRef): List[Class[_]] = { + private def getInnerClasses(obj: AnyRef): List[Class[_]] = { val seen = Set[Class[_]](obj.getClass) var stack = List[Class[_]](obj.getClass) while (!stack.isEmpty) { diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index 13995db5e2064..7408bfed4a6e6 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -21,7 +21,7 @@ import java.io.NotSerializableException import scala.collection.mutable -import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.{BeforeAndAfterAll, FunSuite, PrivateMethodTester} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.serializer.SerializerInstance @@ -30,7 +30,7 @@ import org.apache.spark.serializer.SerializerInstance * Another test suite for the closure cleaner that is finer-grained. * For tests involving end-to-end Spark jobs, see {{ClosureCleanerSuite}}. */ -class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { +class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { // Start a SparkContext so that SparkEnv.get.closureSerializer is accessible // We do not actually use this explicitly except to stop it later @@ -111,6 +111,28 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { fields.mapValues(_.toSet).toMap } + // Accessors for private methods + private val _isClosure = PrivateMethod[Boolean]('isClosure) + private val _getInnerClasses = PrivateMethod[List[Class[_]]]('getInnerClasses) + private val _getOuterClasses = PrivateMethod[List[Class[_]]]('getOuterClasses) + private val _getOuterObjects = PrivateMethod[List[AnyRef]]('getOuterObjects) + + private def isClosure(obj: AnyRef): Boolean = { + ClosureCleaner invokePrivate _isClosure(obj) + } + + private def getInnerClasses(closure: AnyRef): List[Class[_]] = { + ClosureCleaner invokePrivate _getInnerClasses(closure) + } + + private def getOuterClasses(closure: AnyRef): List[Class[_]] = { + ClosureCleaner invokePrivate _getOuterClasses(closure) + } + + private def getOuterObjects(closure: AnyRef): List[AnyRef] = { + ClosureCleaner invokePrivate _getOuterObjects(closure) + } + test("get inner classes") { val closure1 = () => 1 val closure2 = () => { () => 1 } @@ -124,17 +146,17 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { } } } - val inner1 = ClosureCleaner.getInnerClasses(closure1) - val inner2 = ClosureCleaner.getInnerClasses(closure2) - val inner3 = ClosureCleaner.getInnerClasses(closure3) - val inner4 = ClosureCleaner.getInnerClasses(closure4) + val inner1 = getInnerClasses(closure1) + val inner2 = getInnerClasses(closure2) + val inner3 = getInnerClasses(closure3) + val inner4 = getInnerClasses(closure4) assert(inner1.isEmpty) assert(inner2.size === 1) assert(inner3.size === 2) assert(inner4.size === 3) - assert(inner2.forall(ClosureCleaner.isClosure)) - assert(inner3.forall(ClosureCleaner.isClosure)) - assert(inner4.forall(ClosureCleaner.isClosure)) + assert(inner2.forall(isClosure)) + assert(inner3.forall(isClosure)) + assert(inner4.forall(isClosure)) } test("get outer classes and objects") { @@ -143,14 +165,14 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { val closure2 = () => localValue val closure3 = () => someSerializableValue val closure4 = () => someSerializableMethod() - val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) - val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) - val outerClasses3 = ClosureCleaner.getOuterClasses(closure3) - val outerClasses4 = ClosureCleaner.getOuterClasses(closure4) - val outerObjects1 = ClosureCleaner.getOuterObjects(closure1) - val outerObjects2 = ClosureCleaner.getOuterObjects(closure2) - val outerObjects3 = ClosureCleaner.getOuterObjects(closure3) - val outerObjects4 = ClosureCleaner.getOuterObjects(closure4) + val outerClasses1 = getOuterClasses(closure1) + val outerClasses2 = getOuterClasses(closure2) + val outerClasses3 = getOuterClasses(closure3) + val outerClasses4 = getOuterClasses(closure4) + val outerObjects1 = getOuterObjects(closure1) + val outerObjects2 = getOuterObjects(closure2) + val outerObjects3 = getOuterObjects(closure3) + val outerObjects4 = getOuterObjects(closure4) // The classes and objects should have the same size assert(outerClasses1.size === outerObjects1.size) @@ -167,8 +189,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { // The second $outer pointer refers to ClosureCleanerSuite2 assert(outerClasses3.size === 2) assert(outerClasses4.size === 2) - assert(ClosureCleaner.isClosure(outerClasses3(0))) - assert(ClosureCleaner.isClosure(outerClasses4(0))) + assert(isClosure(outerClasses3(0))) + assert(isClosure(outerClasses4(0))) assert(outerClasses3(0) === outerClasses4(0)) // part of the same "FunSuite#test" scope assert(outerClasses3(1) === this.getClass) assert(outerClasses4(1) === this.getClass) @@ -183,10 +205,10 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { val x = 1 val closure1 = () => 1 val closure2 = () => x - val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) - val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) - val outerObjects1 = ClosureCleaner.getOuterObjects(closure1) - val outerObjects2 = ClosureCleaner.getOuterObjects(closure2) + val outerClasses1 = getOuterClasses(closure1) + val outerClasses2 = getOuterClasses(closure2) + val outerObjects1 = getOuterObjects(closure1) + val outerObjects2 = getOuterObjects(closure2) assert(outerClasses1.size === outerObjects1.size) assert(outerClasses2.size === outerObjects2.size) // These inner closures only reference local variables, and so do not have $outer pointer @@ -199,12 +221,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { val closure1 = () => 1 val closure2 = () => y val closure3 = () => localValue - val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) - val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) - val outerClasses3 = ClosureCleaner.getOuterClasses(closure3) - val outerObjects1 = ClosureCleaner.getOuterObjects(closure1) - val outerObjects2 = ClosureCleaner.getOuterObjects(closure2) - val outerObjects3 = ClosureCleaner.getOuterObjects(closure3) + val outerClasses1 = getOuterClasses(closure1) + val outerClasses2 = getOuterClasses(closure2) + val outerClasses3 = getOuterClasses(closure3) + val outerObjects1 = getOuterObjects(closure1) + val outerObjects2 = getOuterObjects(closure2) + val outerObjects3 = getOuterObjects(closure3) assert(outerClasses1.size === outerObjects1.size) assert(outerClasses2.size === outerObjects2.size) assert(outerClasses3.size === outerObjects3.size) @@ -216,10 +238,10 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { // This closure references the "test2" scope because it needs to find the // `localValue` defined outside of this scope assert(outerClasses3.size === 3) - assert(ClosureCleaner.isClosure(outerClasses2(0))) - assert(ClosureCleaner.isClosure(outerClasses3(0))) - assert(ClosureCleaner.isClosure(outerClasses2(1))) - assert(ClosureCleaner.isClosure(outerClasses3(1))) + assert(isClosure(outerClasses2(0))) + assert(isClosure(outerClasses3(0))) + assert(isClosure(outerClasses2(1))) + assert(isClosure(outerClasses3(1))) assert(outerClasses2(0) === outerClasses3(0)) // part of the same "test2" scope assert(outerClasses2(1) === outerClasses3(1)) // part of the same "FunSuite#test" scope assert(outerClasses2(2) === this.getClass) @@ -237,9 +259,9 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { val closure1 = () => 1 val closure2 = () => localValue val closure3 = () => someSerializableValue - val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) - val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) - val outerClasses3 = ClosureCleaner.getOuterClasses(closure3) + val outerClasses1 = getOuterClasses(closure1) + val outerClasses2 = getOuterClasses(closure2) + val outerClasses3 = getOuterClasses(closure3) val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false) val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false) @@ -277,10 +299,10 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll { val closure2 = () => a val closure3 = () => localValue val closure4 = () => someSerializableValue - val outerClasses1 = ClosureCleaner.getOuterClasses(closure1) - val outerClasses2 = ClosureCleaner.getOuterClasses(closure2) - val outerClasses3 = ClosureCleaner.getOuterClasses(closure3) - val outerClasses4 = ClosureCleaner.getOuterClasses(closure4) + val outerClasses1 = getOuterClasses(closure1) + val outerClasses2 = getOuterClasses(closure2) + val outerClasses3 = getOuterClasses(closure3) + val outerClasses4 = getOuterClasses(closure4) // First, find only fields the closures directly access val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false) From 8b71cdb7953ce622fd94fda7e0c5daafeb145cca Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 25 Apr 2015 14:46:47 -0700 Subject: [PATCH 11/23] Update a few comments --- .../org/apache/spark/util/ClosureCleaner.scala | 16 +++++++++------- .../apache/spark/util/ClosureCleanerSuite2.scala | 4 ++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 946f17a1c5cc8..0adfb3423214d 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -138,14 +138,16 @@ private[spark] object ClosureCleaner extends Logging { * The mechanism is to traverse the hierarchy of enclosing closures and null out any * references along the way that are not actually used by the starting closure, but are * nevertheless included in the compiled anonymous classes. Note that it is unsafe to - * simply mutate the enclosing closures, as other code paths may depend on them. Instead, - * we clone each enclosing closure and set the parent pointers accordingly. + * simply mutate the enclosing closures in place, as other code paths may depend on them. + * Instead, we clone each enclosing closure and set the parent pointers accordingly. * * By default, closures are cleaned transitively. This means we detect whether enclosing * objects are actually referenced by the starting one, either directly or transitively, * and, if not, sever these closures from the hierarchy. In other words, in addition to * nulling out unused field references, we also null out any parent pointers that refer - * to enclosing objects not actually needed by the starting closure. + * to enclosing objects not actually needed by the starting closure. We determine + * transitivity by tracing through the tree of all methods ultimately invoked by the + * inner closure and record all the fields referenced in the process. * * For instance, transitive cleaning is necessary in the following scenario: * @@ -160,10 +162,10 @@ private[spark] object ClosureCleaner extends Logging { * } * * In this example, scope "two" is not serializable because it references scope "one", which - * references SomethingNotSerializable. Note that, however, scope "two" does not actually - * depend on SomethingNotSerializable. This means we can null out the parent pointer of - * a cloned scope "one" and set it the parent of scope "two", such that scope "two" no longer - * references SomethingNotSerializable transitively. + * references SomethingNotSerializable. Note that, however, the body of scope "two" does not + * actually depend on SomethingNotSerializable. This means we can safely null out the parent + * pointer of a cloned scope "one" and set it the parent of scope "two", such that scope "two" + * no longer references SomethingNotSerializable transitively. * * @param func the starting closure to clean * @param checkSerializable whether to verify that the closure is serializable after cleaning diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index 7408bfed4a6e6..3ef1946d7bb5b 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -526,8 +526,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM testClean(inner1, serializableBefore = false, serializableAfter = false) // This closure is no longer serializable because it now has a pointer to the outer closure, - // which is itself not serializable because it has a pointer to the ClosureCleanerSuite. - // If we do not clean transitively, we will not null out this parent pointer. + // which is itself not serializable because it has a pointer to the ClosureCleanerSuite2. + // If we do not clean transitively, we will not null out this indirect reference. testClean(inner2, serializableBefore = false, serializableAfter = false, transitive = false) // If we clean transitively, we will find that method `a` does not actually reference the From e45e9049296e6f15a0e60febf3a5581db43c0ffb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 25 Apr 2015 16:37:37 -0700 Subject: [PATCH 12/23] More minor updates (wording, renaming etc.) --- .../apache/spark/util/ClosureCleaner.scala | 23 +++-- .../spark/util/ClosureCleanerSuite2.scala | 95 ++++++++++--------- 2 files changed, 66 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 0adfb3423214d..9ea0da15f7947 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -363,24 +363,24 @@ private class ReturnStatementFinder extends ClassVisitor(ASM4) { } /** Helper class to identify a method. */ -private case class MethodIdentifier(cls: Class[_], name: String, desc: String) +private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String) /** * Find the fields accessed by a given class. * - * The fields are stored in the mutable map passed in by the class that contains them. + * The resulting fields are stored in the mutable map passed in through the constructor. * This map is assumed to have its keys already populated with the classes of interest. * * @param fields the mutable map that stores the fields to return - * @param findTransitively if true, find fields indirectly referenced in other classes - * @param specificMethod if not empty, visit only this method - * @param visitedMethods a list of visited methods to avoid cycles + * @param findTransitively if true, find fields indirectly referenced through method calls + * @param specificMethod if not empty, visit only this specific method + * @param visitedMethods a set of visited methods to avoid cycles */ private[util] class FieldAccessFinder( fields: Map[Class[_], Set[String]], findTransitively: Boolean, - specificMethod: Option[MethodIdentifier] = None, - visitedMethods: Set[MethodIdentifier] = Set.empty) + specificMethod: Option[MethodIdentifier[_]] = None, + visitedMethods: Set[MethodIdentifier[_]] = Set.empty) extends ClassVisitor(ASM4) { override def visitMethod( @@ -390,7 +390,7 @@ private[util] class FieldAccessFinder( sig: String, exceptions: Array[String]): MethodVisitor = { - // Ignore this method unless we are told to visit it + // If we are told to visit only a certain method and this is not the one, ignore it if (specificMethod.isDefined && (specificMethod.get.name != name || specificMethod.get.desc != desc)) { return null @@ -412,7 +412,7 @@ private[util] class FieldAccessFinder( if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) { fields(cl) += name } - // Visit other methods to find fields that are transitively referenced + // Optionally visit other methods to find fields that are transitively referenced if (findTransitively) { val m = MethodIdentifier(cl, name, desc) if (!visitedMethods.contains(m)) { @@ -431,6 +431,11 @@ private[util] class FieldAccessFinder( private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) { var myName: String = null + // TODO: Recursively find inner closures that we indirectly reference, e.g. + // val closure1 = () = { () => 1 } + // val closure2 = () => { (1 to 5).map(closure1) } + // The second closure technically has two inner closures, but this finder only finds one + override def visit(version: Int, access: Int, name: String, sig: String, superName: String, interfaces: Array[String]) { myName = name diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index 3ef1946d7bb5b..c5575471d18c2 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -32,8 +32,8 @@ import org.apache.spark.serializer.SerializerInstance */ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { - // Start a SparkContext so that SparkEnv.get.closureSerializer is accessible - // We do not actually use this explicitly except to stop it later + // Start a SparkContext so that the closure serializer is accessible + // We do not actually use this explicitly otherwise private var sc: SparkContext = null private var closureSerializer: SerializerInstance = null @@ -48,7 +48,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM closureSerializer = null } - // Some fields and methods that belong to this class, which is itself not serializable + // Some fields and methods to reference in inner closures later private val someSerializableValue = 1 private val someNonSerializableValue = new NonSerializable private def someSerializableMethod() = 1 @@ -86,19 +86,19 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM assertSerializable(closure, serializableBefore) // If the resulting closure is not serializable even after // cleaning, we expect ClosureCleaner to throw a SparkException - intercept[SparkException] { + if (serializableAfter) { ClosureCleaner.clean(closure, checkSerializable = true, transitive) - // Otherwise, if we do expect the closure to be serializable after the - // clean, throw the SparkException ourselves so scalatest is happy - if (serializableAfter) { throw new SparkException("no-op") } + } else { + intercept[SparkException] { + ClosureCleaner.clean(closure, checkSerializable = true, transitive) + } } assertSerializable(closure, serializableAfter) } /** * Return the fields accessed by the given closure by class. - * This also optionally finds the fields transitively referenced through methods - * that belong to other classes. + * This also optionally finds the fields transitively referenced through methods invocations. */ private def findAccessedFields( closure: AnyRef, @@ -211,7 +211,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val outerObjects2 = getOuterObjects(closure2) assert(outerClasses1.size === outerObjects1.size) assert(outerClasses2.size === outerObjects2.size) - // These inner closures only reference local variables, and so do not have $outer pointer + // These inner closures only reference local variables, and so do not have $outer pointers assert(outerClasses1.isEmpty) assert(outerClasses2.isEmpty) } @@ -235,8 +235,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM // This closure references the "test2" scope because it needs to find the method `y` // Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2 assert(outerClasses2.size === 3) - // This closure references the "test2" scope because it needs to find the - // `localValue` defined outside of this scope + // This closure references the "test2" scope because it needs to find the `localValue` + // defined outside of this scope assert(outerClasses3.size === 3) assert(isClosure(outerClasses2(0))) assert(isClosure(outerClasses3(0))) @@ -270,10 +270,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM assert(fields2.isEmpty) assert(fields3.size === 2) // This corresponds to the "FunSuite#test" closure. This is empty because the - // field `closure3` references belongs to its parent (i.e. ClosureCleanerSuite2) + // `someSerializableValue` belongs to its parent (i.e. ClosureCleanerSuite2). assert(fields3(outerClasses3(0)).isEmpty) // This corresponds to the ClosureCleanerSuite2. This is also empty, however, - // because we did not find fields transitively (i.e. beyond 1 enclosing scope) + // because accessing a `ClosureCleanerSuite2#someSerializableValue` actually involves a + // method call. Since we do not find fields transitively, we will not recursively trace + // through the fields referenced by this method. assert(fields3(outerClasses3(1)).isEmpty) val fields1t = findAccessedFields(closure1, outerClasses1, findTransitively = true) @@ -283,7 +285,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM assert(fields2t.isEmpty) assert(fields3t.size === 2) // Because we find fields transitively now, we are able to detect that we need the - // $outer pointer to get the field from the ClosureCleanerSuite2. + // $outer pointer to get the field from the ClosureCleanerSuite2 assert(fields3t(outerClasses3(0)).size === 1) assert(fields3t(outerClasses3(0)).head === "$outer") assert(fields3t(outerClasses3(1)).size === 1) @@ -304,7 +306,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val outerClasses3 = getOuterClasses(closure3) val outerClasses4 = getOuterClasses(closure4) - // First, find only fields the closures directly access + // First, find only fields accessed directly, not transitively, by these closures val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false) val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false) val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false) @@ -312,23 +314,24 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM assert(fields1.isEmpty) // "test1" < "FunSuite#test" < ClosureCleanerSuite2 assert(fields2.size === 3) - assert(fields2(outerClasses2(0)).isEmpty) // `def a` is not a field - assert(fields2(outerClasses2(1)).isEmpty) - assert(fields2(outerClasses2(2)).isEmpty) + // Since we do not find fields transitively here, we do not look into what `def a` references + assert(fields2(outerClasses2(0)).isEmpty) // This corresponds to the "test1" scope + assert(fields2(outerClasses2(1)).isEmpty) // This corresponds to the "FunSuite#test" scope + assert(fields2(outerClasses2(2)).isEmpty) // This corresponds to the ClosureCleanerSuite2 assert(fields3.size === 3) - // Note that `localValue` is a field of the "test1" closure because `def a` needs it - // Further note that it is NOT a field of the "FunSuite#test" closure but a local variable + // Note that `localValue` is a field of the "test1" scope because `def a` references it, + // but NOT a field of the "FunSuite#test" scope because it is only a local variable there assert(fields3(outerClasses3(0)).size === 1) assert(fields3(outerClasses3(0)).head.contains("localValue")) assert(fields3(outerClasses3(1)).isEmpty) assert(fields3(outerClasses3(2)).isEmpty) assert(fields4.size === 3) + // Because `val someSerializableValue` is an instance variable, even an explicit reference + // here actually involves a method call to access the underlying value of the variable. + // Because we are not finding fields transitively here, we do not consider the fields + // accessed by this "method" (i.e. the val's accessor). assert(fields4(outerClasses4(0)).isEmpty) assert(fields4(outerClasses4(1)).isEmpty) - // Because `someSerializableValue` is a val, even an explicit reference here actually - // involves a method call to access the underlying value of the variable. Because we are - // not finding fields transitively here, we do not consider the fields accessed by this - // "method" (i.e. the val's accessor). assert(fields4(outerClasses4(2)).isEmpty) // Now do the same, but find fields that the closures transitively reference @@ -338,8 +341,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val fields4t = findAccessedFields(closure4, outerClasses4, findTransitively = true) assert(fields1t.isEmpty) assert(fields2t.size === 3) - // This closure transitively references `localValue` because `def a` uses it - assert(fields2t(outerClasses2(0)).size === 1) + assert(fields2t(outerClasses2(0)).size === 1) // `def a` references `localValue` assert(fields2t(outerClasses2(0)).head.contains("localValue")) assert(fields2t(outerClasses2(1)).isEmpty) assert(fields2t(outerClasses2(2)).isEmpty) @@ -362,11 +364,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean basic serializable closures") { - val localSerializableVal = someSerializableValue + val localValue = someSerializableValue val closure1 = () => 1 val closure2 = () => Array[String]("a", "b", "c") val closure3 = (s: String, arr: Array[Long]) => s + arr.mkString(", ") - val closure4 = () => localSerializableVal + val closure4 = () => localValue val closure5 = () => new NonSerializable(5) // we're just serializing the class information val closure1r = closure1() val closure2r = closure2() @@ -395,7 +397,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val closure4 = () => someNonSerializableValue val closure2 = () => someNonSerializableMethod() - // These are not cleanable because they ultimately reference the `this` pointer + // These are not cleanable because they ultimately reference the ClosureCleanerSuite2 testClean(closure1, serializableBefore = false, serializableAfter = false) testClean(closure2, serializableBefore = false, serializableAfter = false) testClean(closure3, serializableBefore = false, serializableAfter = false) @@ -404,13 +406,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean basic nested serializable closures") { - val localSerializableValue = someSerializableValue + val localValue = someSerializableValue val closure1 = (i: Int) => { - (1 to i).map { x => x + localSerializableValue } // 1 level of nesting + (1 to i).map { x => x + localValue } // 1 level of nesting } val closure2 = (j: Int) => { (1 to j).flatMap { x => - (1 to x).map { y => y + localSerializableValue } // 2 levels + (1 to x).map { y => y + localValue } // 2 levels } } val closure3 = (k: Int, l: Int, m: Int) => { @@ -426,6 +428,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM testClean(closure2, serializableBefore = true, serializableAfter = true) testClean(closure3, serializableBefore = true, serializableAfter = true) + // Verify that closures can still be invoked and the result still the same assert(closure1(1) === closure1r) assert(closure2(2) === closure2r) assert(closure3(3, 4, 5) === closure3r) @@ -434,9 +437,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM test("clean basic nested non-serializable closures") { def localSerializableMethod() = someSerializableValue val localNonSerializableValue = someNonSerializableValue + // These closures ultimately reference the ClosureCleanerSuite2 + // Note that even accessing `val` that is an instance variable involves a method call val closure1 = (i: Int) => { (1 to i).map { x => x + someSerializableValue } } val closure2 = (j: Int) => { (1 to j).map { x => x + someSerializableMethod() } } val closure4 = (k: Int) => { (1 to k).map { x => x + localSerializableMethod() } } + // This closure references a local non-serializable value val closure3 = (l: Int) => { (1 to l).map { x => localNonSerializableValue } } // This is non-serializable no matter how many levels we nest it val closure5 = (m: Int) => { @@ -457,7 +463,10 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean complicated nested serializable closures") { - val localSerializableValue = someSerializableValue + val localValue = someSerializableValue + + // Here we assume that if the outer closure is serializable, + // then all inner closures must also be serializable // Reference local fields from all levels val closure1 = (i: Int) => { @@ -465,7 +474,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM (1 to i).flatMap { x => val b = a + 1 (1 to x).map { y => - y + a + b + localSerializableValue + y + a + b + localValue } } } @@ -479,8 +488,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM def b2 = a2 + 1 (1 to x).map { y => // If this references a method outside the outermost closure, then it will try to pull - // in the ClosureCleanerSuite2. This is why `localSerializableValue` here must be a val. - y + a1 + a2 + b1 + b2 + localSerializableValue + // in the ClosureCleanerSuite2. This is why `localValue` here must be a local `val`. + y + a1 + a2 + b1 + b2 + localValue } } } @@ -494,13 +503,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean complicated nested non-serializable closures") { - val localSerializableValue = someSerializableValue + val localValue = someSerializableValue - // Note that we are not interested in cleaning the outer closures here + // Note that we are not interested in cleaning the outer closures here (they are not cleanable) // The only reason why they exist is to nest the inner closures val test1 = () => { - val a = localSerializableValue + val a = localValue val b = sc val inner1 = (x: Int) => x + a + b.hashCode() val inner2 = (x: Int) => x + a @@ -509,15 +518,15 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM // There is no way to clean it testClean(inner1, serializableBefore = false, serializableAfter = false) - // This closure is serializable to begin with since - // it does not have a pointer to the outer closure + // This closure is serializable to begin with since it does not need a pointer to + // the outer closure (it only references local variables) testClean(inner2, serializableBefore = true, serializableAfter = true) } // Same as above, but the `val a` becomes `def a` // The difference here is that all inner closures now have pointers to the outer closure val test2 = () => { - def a = localSerializableValue + def a = localValue val b = sc val inner1 = (x: Int) => x + a + b.hashCode() val inner2 = (x: Int) => x + a From 6d4d3f1ac8da883fb814613afec35900b078b751 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 25 Apr 2015 20:07:35 -0700 Subject: [PATCH 13/23] Fix scala style? --- .../test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index c5575471d18c2..e072b4128b804 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -435,7 +435,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean basic nested non-serializable closures") { - def localSerializableMethod() = someSerializableValue + def localSerializableMethod(): Int = someSerializableValue val localNonSerializableValue = someNonSerializableValue // These closures ultimately reference the ClosureCleanerSuite2 // Note that even accessing `val` that is an instance variable involves a method call From 9419efea98e190766887a9ab5fbac57f45bf007c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 27 Apr 2015 21:14:28 -0700 Subject: [PATCH 14/23] Bypass SerializationDebugger for now (SPARK-7180) --- .../org/apache/spark/util/ClosureCleanerSuite2.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index e072b4128b804..934fe4f3a0a97 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.scalatest.{BeforeAndAfterAll, FunSuite, PrivateMethodTester} -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.serializer.SerializerInstance /** @@ -38,7 +38,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM private var closureSerializer: SerializerInstance = null override def beforeAll(): Unit = { - sc = new SparkContext("local", "test") + val conf = new SparkConf() + .set("spark.master", "local") + .set("spark.app.name", "test") + // SerializationDebugger currently cannot serialize ClosureCleanerSuite2 because + // it has non-serializable fields but inherits a serializable class (SPARK-7180) + .set("spark.serializer.extraDebugInfo", "false") + sc = new SparkContext(conf) closureSerializer = sc.env.closureSerializer.newInstance() } From d889950353a4e1ea23bcdd3f608a8930ff44f7ca Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 28 Apr 2015 13:46:25 -0700 Subject: [PATCH 15/23] Revert "Bypass SerializationDebugger for now (SPARK-7180)" This reverts commit 9419efea98e190766887a9ab5fbac57f45bf007c. --- .../org/apache/spark/util/ClosureCleanerSuite2.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index 934fe4f3a0a97..e072b4128b804 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.scalatest.{BeforeAndAfterAll, FunSuite, PrivateMethodTester} -import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.serializer.SerializerInstance /** @@ -38,13 +38,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM private var closureSerializer: SerializerInstance = null override def beforeAll(): Unit = { - val conf = new SparkConf() - .set("spark.master", "local") - .set("spark.app.name", "test") - // SerializationDebugger currently cannot serialize ClosureCleanerSuite2 because - // it has non-serializable fields but inherits a serializable class (SPARK-7180) - .set("spark.serializer.extraDebugInfo", "false") - sc = new SparkContext(conf) + sc = new SparkContext("local", "test") closureSerializer = sc.env.closureSerializer.newInstance() } From 399816819e0f0bc9fde9fac1a3d6b6f6ef641a74 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 28 Apr 2015 17:50:29 -0700 Subject: [PATCH 16/23] In sc.runJob, actually clean the inner closure If a closure is passed into another closure as a reference, the inner closure will be a field of the outer closure. In sc.runJob, we used to only clean the outer closure, leaving the inner one uncleaned. Simple reproduction: Wrap RDD#take in a closure. For instance: // The body "..." here contains a return statement def take(num: Int): Array[T] = (1 to 1).foreach { _ => ... } Now if you call `sc.parallelize(1 to 10).take(5)`, the closure cleaner will not be able to find the return statement in the `foreach` closure. This is because it's not even cleaning the `foreach` closure. Instead, it will fail with a not serializable exception complaining the internal java.lang.Object $nonLocalReturnKey is not serializable. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 978e5965d5ede..3a2a4d95fd5d0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1627,7 +1627,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { - runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) + // We must clean `func` here before using it in another closure below + // Otherwise, the closure cleaner will only clean the outer closure but not `func` + val cleanedFunc = clean(func) + runJob(rdd, (ctx: TaskContext, iter: Iterator[T]) => cleanedFunc(iter), partitions, allowLocal) } /** From e909a4222a132b1fc4fff87a111ece7d32b82765 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 28 Apr 2015 19:35:45 -0700 Subject: [PATCH 17/23] Guard against NPE if CC is used outside of an application --- .../src/main/scala/org/apache/spark/util/ClosureCleaner.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 9ea0da15f7947..cb15c624140e5 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -310,7 +310,9 @@ private[spark] object ClosureCleaner extends Logging { private def ensureSerializable(func: AnyRef) { try { - SparkEnv.get.closureSerializer.newInstance().serialize(func) + if (SparkEnv.get != null) { + SparkEnv.get.closureSerializer.newInstance().serialize(func) + } } catch { case ex: Exception => throw new SparkException("Task not serializable", ex) } From 6f75784c657d64acf4d9de37f15687bf5541f165 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 29 Apr 2015 11:05:47 -0700 Subject: [PATCH 18/23] Revert "Guard against NPE if CC is used outside of an application" This reverts commit e909a4222a132b1fc4fff87a111ece7d32b82765. --- .../src/main/scala/org/apache/spark/util/ClosureCleaner.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index cb15c624140e5..9ea0da15f7947 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -310,9 +310,7 @@ private[spark] object ClosureCleaner extends Logging { private def ensureSerializable(func: AnyRef) { try { - if (SparkEnv.get != null) { - SparkEnv.get.closureSerializer.newInstance().serialize(func) - } + SparkEnv.get.closureSerializer.newInstance().serialize(func) } catch { case ex: Exception => throw new SparkException("Task not serializable", ex) } From 26c7abaca5ea6a76f6839c29d9c209e9aaee4948 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 29 Apr 2015 11:05:53 -0700 Subject: [PATCH 19/23] Revert "In sc.runJob, actually clean the inner closure" This reverts commit 399816819e0f0bc9fde9fac1a3d6b6f6ef641a74. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3a2a4d95fd5d0..978e5965d5ede 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1627,10 +1627,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { - // We must clean `func` here before using it in another closure below - // Otherwise, the closure cleaner will only clean the outer closure but not `func` - val cleanedFunc = clean(func) - runJob(rdd, (ctx: TaskContext, iter: Iterator[T]) => cleanedFunc(iter), partitions, allowLocal) + runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) } /** From 26c507257a80273d5941e3c974c15d4721cc3b4b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 1 May 2015 17:57:02 -0700 Subject: [PATCH 20/23] Address comments --- .../scala/org/apache/spark/SparkContext.scala | 3 +- .../apache/spark/util/ClosureCleaner.scala | 21 ++++-- .../spark/util/ClosureCleanerSuite2.scala | 73 ++++++++++--------- 3 files changed, 54 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 04cc8ec6adfee..3f7cba6dbcdb5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1769,8 +1769,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * serializable */ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - val cleanTransitively = conf.getBoolean("spark.closureCleaner.transitive", true) - ClosureCleaner.clean(f, checkSerializable, cleanTransitively) + ClosureCleaner.clean(f, checkSerializable) f } diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 9ea0da15f7947..f9e6ca6a64599 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -62,7 +62,7 @@ private[spark] object ClosureCleaner extends Logging { // The outer pointer may be null if we have cleaned this closure before if (outer != null) { if (isClosure(f.getType)) { - return f.getType :: getOuterClasses(f.get(obj)) + return f.getType :: getOuterClasses(outer) } else { return f.getType :: Nil // Stop at the first $outer that is not a closure } @@ -79,9 +79,9 @@ private[spark] object ClosureCleaner extends Logging { // The outer pointer may be null if we have cleaned this closure before if (outer != null) { if (isClosure(f.getType)) { - return f.get(obj) :: getOuterObjects(f.get(obj)) + return outer :: getOuterObjects(outer) } else { - return f.get(obj) :: Nil // Stop at the first $outer that is not a closure + return outer :: Nil // Stop at the first $outer that is not a closure } } } @@ -91,7 +91,10 @@ private[spark] object ClosureCleaner extends Logging { /** * Return a list of classes that represent closures enclosed in the given closure object. */ - private def getInnerClasses(obj: AnyRef): List[Class[_]] = { + private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = { + if (!isClosure(obj.getClass)) { + throw new IllegalArgumentException(s"Expected a closure object; got ${obj.getClass.getName}") + } val seen = Set[Class[_]](obj.getClass) var stack = List[Class[_]](obj.getClass) while (!stack.isEmpty) { @@ -104,7 +107,7 @@ private[spark] object ClosureCleaner extends Logging { stack = cls :: stack } } - return (seen - obj.getClass).toList + (seen - obj.getClass).toList } private def createNullValue(cls: Class[_]): AnyRef = { @@ -153,12 +156,12 @@ private[spark] object ClosureCleaner extends Logging { * * class SomethingNotSerializable { * def someValue = 1 + * def scope(name: String)(body: => Unit) = body * def someMethod(): Unit = scope("one") { * def x = someValue * def y = 2 * scope("two") { println(y + 1) } * } - * def scope(name: String)(body: => Unit) = body * } * * In this example, scope "two" is not serializable because it references scope "one", which @@ -189,7 +192,7 @@ private[spark] object ClosureCleaner extends Logging { logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}}) +++") // A list of classes that represents closures enclosed in the given one - val innerClasses = getInnerClasses(func) + val innerClasses = getInnerClosureClasses(func) // A list of enclosing objects and their respective classes, from innermost to outermost // An outer object at a given index is of type outer class at the same index @@ -280,7 +283,9 @@ private[spark] object ClosureCleaner extends Logging { // the already populated accessed fields map of the starting closure if (cleanTransitively && isClosure(clone.getClass)) { logDebug(s" + cleaning cloned closure $clone recursively (${cls.getName})") - clean(clone, checkSerializable, cleanTransitively, accessedFields) + // No need to check serializable here for the outer closures because we're + // only interested in the serializability of the starting closure + clean(clone, checkSerializable = false, cleanTransitively, accessedFields) } parent = clone } diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index e072b4128b804..ed4c432f452e8 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -68,17 +68,23 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM /** * Helper method for testing whether closure cleaning works as expected. * This cleans the given closure twice, with and without transitive cleaning. + * + * @param closure closure to test cleaning with + * @param serializableBefore if true, verify that the closure is serializable + * before cleaning, otherwise assert that it is not + * @param serializableAfter if true, assert that the closure is serializable + * after cleaning otherwise assert that it is not */ - private def testClean( + private def verifyCleaning( closure: AnyRef, serializableBefore: Boolean, serializableAfter: Boolean): Unit = { - testClean(closure, serializableBefore, serializableAfter, transitive = true) - testClean(closure, serializableBefore, serializableAfter, transitive = false) + verifyCleaning(closure, serializableBefore, serializableAfter, transitive = true) + verifyCleaning(closure, serializableBefore, serializableAfter, transitive = false) } /** Helper method for testing whether closure cleaning works as expected. */ - private def testClean( + private def verifyCleaning( closure: AnyRef, serializableBefore: Boolean, serializableAfter: Boolean, @@ -113,7 +119,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM // Accessors for private methods private val _isClosure = PrivateMethod[Boolean]('isClosure) - private val _getInnerClasses = PrivateMethod[List[Class[_]]]('getInnerClasses) + private val _getInnerClosureClasses = PrivateMethod[List[Class[_]]]('getInnerClosureClasses) private val _getOuterClasses = PrivateMethod[List[Class[_]]]('getOuterClasses) private val _getOuterObjects = PrivateMethod[List[AnyRef]]('getOuterObjects) @@ -121,7 +127,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM ClosureCleaner invokePrivate _isClosure(obj) } - private def getInnerClasses(closure: AnyRef): List[Class[_]] = { + private def getInnerClosureClasses(closure: AnyRef): List[Class[_]] = { ClosureCleaner invokePrivate _getInnerClasses(closure) } @@ -133,7 +139,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM ClosureCleaner invokePrivate _getOuterObjects(closure) } - test("get inner classes") { + test("get inner closure classes") { val closure1 = () => 1 val closure2 = () => { () => 1 } val closure3 = (i: Int) => { @@ -312,7 +318,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false) val fields4 = findAccessedFields(closure4, outerClasses4, findTransitively = false) assert(fields1.isEmpty) - // "test1" < "FunSuite#test" < ClosureCleanerSuite2 + // Note that the size here represents the number of outer classes, not the number of fields + // "test1" < parameter of "FunSuite#test" < ClosureCleanerSuite2 assert(fields2.size === 3) // Since we do not find fields transitively here, we do not look into what `def a` references assert(fields2(outerClasses2(0)).isEmpty) // This corresponds to the "test1" scope @@ -376,11 +383,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val closure4r = closure4() val closure5r = closure5() - testClean(closure1, serializableBefore = true, serializableAfter = true) - testClean(closure2, serializableBefore = true, serializableAfter = true) - testClean(closure3, serializableBefore = true, serializableAfter = true) - testClean(closure4, serializableBefore = true, serializableAfter = true) - testClean(closure5, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure1, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure2, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure3, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure4, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure5, serializableBefore = true, serializableAfter = true) // Verify that closures can still be invoked and the result still the same assert(closure1() === closure1r) @@ -398,11 +405,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val closure2 = () => someNonSerializableMethod() // These are not cleanable because they ultimately reference the ClosureCleanerSuite2 - testClean(closure1, serializableBefore = false, serializableAfter = false) - testClean(closure2, serializableBefore = false, serializableAfter = false) - testClean(closure3, serializableBefore = false, serializableAfter = false) - testClean(closure4, serializableBefore = false, serializableAfter = false) - testClean(closure5, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure1, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure2, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure3, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure4, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure5, serializableBefore = false, serializableAfter = false) } test("clean basic nested serializable closures") { @@ -424,9 +431,9 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val closure2r = closure2(2) val closure3r = closure3(3, 4, 5) - testClean(closure1, serializableBefore = true, serializableAfter = true) - testClean(closure2, serializableBefore = true, serializableAfter = true) - testClean(closure3, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure1, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure2, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure3, serializableBefore = true, serializableAfter = true) // Verify that closures can still be invoked and the result still the same assert(closure1(1) === closure1r) @@ -455,11 +462,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } } - testClean(closure1, serializableBefore = false, serializableAfter = false) - testClean(closure2, serializableBefore = false, serializableAfter = false) - testClean(closure3, serializableBefore = false, serializableAfter = false) - testClean(closure4, serializableBefore = false, serializableAfter = false) - testClean(closure5, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure1, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure2, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure3, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure4, serializableBefore = false, serializableAfter = false) + verifyCleaning(closure5, serializableBefore = false, serializableAfter = false) } test("clean complicated nested serializable closures") { @@ -496,8 +503,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val closure1r = closure1(1) val closure2r = closure2(2) - testClean(closure1, serializableBefore = true, serializableAfter = true) - testClean(closure2, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure1, serializableBefore = true, serializableAfter = true) + verifyCleaning(closure2, serializableBefore = true, serializableAfter = true) assert(closure1(1) == closure1r) assert(closure2(2) == closure2r) } @@ -516,11 +523,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM // This closure explicitly references a non-serializable field // There is no way to clean it - testClean(inner1, serializableBefore = false, serializableAfter = false) + verifyCleaning(inner1, serializableBefore = false, serializableAfter = false) // This closure is serializable to begin with since it does not need a pointer to // the outer closure (it only references local variables) - testClean(inner2, serializableBefore = true, serializableAfter = true) + verifyCleaning(inner2, serializableBefore = true, serializableAfter = true) } // Same as above, but the `val a` becomes `def a` @@ -532,17 +539,17 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val inner2 = (x: Int) => x + a // As before, this closure is neither serializable nor cleanable - testClean(inner1, serializableBefore = false, serializableAfter = false) + verifyCleaning(inner1, serializableBefore = false, serializableAfter = false) // This closure is no longer serializable because it now has a pointer to the outer closure, // which is itself not serializable because it has a pointer to the ClosureCleanerSuite2. // If we do not clean transitively, we will not null out this indirect reference. - testClean(inner2, serializableBefore = false, serializableAfter = false, transitive = false) + verifyCleaning(inner2, serializableBefore = false, serializableAfter = false, transitive = false) // If we clean transitively, we will find that method `a` does not actually reference the // outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out // the outer closure's parent pointer. This will make `inner2` serializable. - testClean(inner2, serializableBefore = false, serializableAfter = true, transitive = true) + verifyCleaning(inner2, serializableBefore = false, serializableAfter = true, transitive = true) } // Same as above, but with more levels of nesting From ea874bc6c9cdede04cf4bd68b0053811cdef11a9 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 1 May 2015 18:04:38 -0700 Subject: [PATCH 21/23] Fix tests --- .../org/apache/spark/util/ClosureCleanerSuite2.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index ed4c432f452e8..ace1b8ac863c4 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -128,7 +128,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } private def getInnerClosureClasses(closure: AnyRef): List[Class[_]] = { - ClosureCleaner invokePrivate _getInnerClasses(closure) + ClosureCleaner invokePrivate _getInnerClosureClasses(closure) } private def getOuterClasses(closure: AnyRef): List[Class[_]] = { @@ -152,10 +152,10 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } } } - val inner1 = getInnerClasses(closure1) - val inner2 = getInnerClasses(closure2) - val inner3 = getInnerClasses(closure3) - val inner4 = getInnerClasses(closure4) + val inner1 = getInnerClosureClasses(closure1) + val inner2 = getInnerClosureClasses(closure2) + val inner3 = getInnerClosureClasses(closure3) + val inner4 = getInnerClosureClasses(closure4) assert(inner1.isEmpty) assert(inner2.size === 1) assert(inner3.size === 2) From 0bbe77f10a23e8fe96d587b69266c9cbc85b9ed7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 1 May 2015 18:14:48 -0700 Subject: [PATCH 22/23] Fix style --- .../scala/org/apache/spark/util/ClosureCleanerSuite2.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index ace1b8ac863c4..59456790e89f0 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -544,12 +544,14 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM // This closure is no longer serializable because it now has a pointer to the outer closure, // which is itself not serializable because it has a pointer to the ClosureCleanerSuite2. // If we do not clean transitively, we will not null out this indirect reference. - verifyCleaning(inner2, serializableBefore = false, serializableAfter = false, transitive = false) + verifyCleaning( + inner2, serializableBefore = false, serializableAfter = false, transitive = false) // If we clean transitively, we will find that method `a` does not actually reference the // outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out // the outer closure's parent pointer. This will make `inner2` serializable. - verifyCleaning(inner2, serializableBefore = false, serializableAfter = true, transitive = true) + verifyCleaning( + inner2, serializableBefore = false, serializableAfter = true, transitive = true) } // Same as above, but with more levels of nesting From cd4623006d0d30c1fcd66eb7c947eab4d201e43b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 1 May 2015 22:10:53 -0700 Subject: [PATCH 23/23] Revert a small change that affected streaming DStream#transform isn't cleaning closures correctly. It is passing an RDD to ClosureCleaner#clean. We should fix this separately outside of this patch. --- core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index f9e6ca6a64599..4ac0382d80815 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -92,9 +92,6 @@ private[spark] object ClosureCleaner extends Logging { * Return a list of classes that represent closures enclosed in the given closure object. */ private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = { - if (!isClosure(obj.getClass)) { - throw new IllegalArgumentException(s"Expected a closure object; got ${obj.getClass.getName}") - } val seen = Set[Class[_]](obj.getClass) var stack = List[Class[_]](obj.getClass) while (!stack.isEmpty) {