Skip to content

Commit b32b212

Browse files
ALeksander Eskilsoncloud-fan
authored andcommitted
[SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - Class Splitting
## What changes were proposed in this pull request? This pull-request exclusively includes the class splitting feature described in #16648. When code for a given class would grow beyond 1600k bytes, a private, nested sub-class is generated into which subsequent functions are inlined. Additional sub-classes are generated as the code threshold is met subsequent times. This code includes 3 changes: 1. Includes helper maps, lists, and functions for keeping track of sub-classes during code generation (included in the `CodeGenerator` class). These helper functions allow nested classes and split functions to be initialized/declared/inlined to the appropriate locations in the various projection classes. 2. Changes `addNewFunction` to return a string to support instances where a split function is inlined to a nested class and not the outer class (and so must be invoked using the class-qualified name). Uses of `addNewFunction` throughout the codebase are modified so that the returned name is properly used. 3. Removes instances of the `this` keyword when used on data inside generated classes. All state declared in the outer class is by default global and accessible to the nested classes. However, if a reference to global state in a nested class is prepended with the `this` keyword, it would attempt to reference state belonging to the nested class (which would not exist), rather than the correct variable belonging to the outer class. ## How was this patch tested? Added a test case to the `GeneratedProjectionSuite` that increases the number of columns tested in various projections to a threshold that would previously have triggered a `JaninoRuntimeException` for the Constant Pool. Note: This PR does not address the second Constant Pool issue with code generation (also mentioned in #16648): excess global mutable state. A second PR may be opened to resolve that issue. Author: ALeksander Eskilson <[email protected]> Closes #18075 from bdrillard/class_splitting_only.
1 parent 2051428 commit b32b212

File tree

22 files changed

+259
-81
lines changed

22 files changed

+259
-81
lines changed

sql/catalyst/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@
131131
</execution>
132132
</executions>
133133
</plugin>
134+
<plugin>
135+
<groupId>org.scalatest</groupId>
136+
<artifactId>scalatest-maven-plugin</artifactId>
137+
<configuration>
138+
<argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
139+
</configuration>
140+
</plugin>
134141
<plugin>
135142
<groupId>org.antlr</groupId>
136143
<artifactId>antlr4-maven-plugin</artifactId>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ case class ScalaUDF(
988988
val converterTerm = ctx.freshName("converter")
989989
val expressionIdx = ctx.references.size - 1
990990
ctx.addMutableState(converterClassName, converterTerm,
991-
s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
991+
s"$converterTerm = ($converterClassName)$typeConvertersClassName" +
992992
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
993993
s"references[$expressionIdx]).getChildren().apply($index))).dataType());")
994994
converterTerm
@@ -1005,7 +1005,7 @@ case class ScalaUDF(
10051005
// Generate codes used to convert the returned value of user-defined functions to Catalyst type
10061006
val catalystConverterTerm = ctx.freshName("catalystConverter")
10071007
ctx.addMutableState(converterClassName, catalystConverterTerm,
1008-
s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
1008+
s"$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
10091009
s".createToCatalystConverter($scalaUDF.dataType());")
10101010

10111011
val resultTerm = ctx.freshName("result")
@@ -1019,7 +1019,7 @@ case class ScalaUDF(
10191019

10201020
val funcTerm = ctx.freshName("udf")
10211021
ctx.addMutableState(funcClassName, funcTerm,
1022-
s"this.$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
1022+
s"$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
10231023

10241024
// codegen for children expressions
10251025
val evals = children.map(_.genCode(ctx))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 117 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import scala.util.control.NonFatal
2828

2929
import com.google.common.cache.{CacheBuilder, CacheLoader}
3030
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
31-
import org.apache.commons.lang3.exception.ExceptionUtils
3231
import org.codehaus.commons.compiler.CompileException
3332
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler}
3433
import org.codehaus.janino.util.ClassFile
@@ -113,7 +112,7 @@ class CodegenContext {
113112
val idx = references.length
114113
references += obj
115114
val clsName = Option(className).getOrElse(obj.getClass.getName)
116-
addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
115+
addMutableState(clsName, term, s"$term = ($clsName) references[$idx];")
117116
term
118117
}
119118

@@ -202,16 +201,6 @@ class CodegenContext {
202201
partitionInitializationStatements.mkString("\n")
203202
}
204203

205-
/**
206-
* Holding all the functions those will be added into generated class.
207-
*/
208-
val addedFunctions: mutable.Map[String, String] =
209-
mutable.Map.empty[String, String]
210-
211-
def addNewFunction(funcName: String, funcCode: String): Unit = {
212-
addedFunctions += ((funcName, funcCode))
213-
}
214-
215204
/**
216205
* Holds expressions that are equivalent. Used to perform subexpression elimination
217206
* during codegen.
@@ -233,10 +222,118 @@ class CodegenContext {
233222
// The collection of sub-expression result resetting methods that need to be called on each row.
234223
val subexprFunctions = mutable.ArrayBuffer.empty[String]
235224

236-
def declareAddedFunctions(): String = {
237-
addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n")
225+
val outerClassName = "OuterClass"
226+
227+
/**
228+
* Holds the class and instance names to be generated, where `OuterClass` is a placeholder
229+
* standing for whichever class is generated as the outermost class and which will contain any
230+
* nested sub-classes. All other classes and instance names in this list will represent private,
231+
* nested sub-classes.
232+
*/
233+
private val classes: mutable.ListBuffer[(String, String)] =
234+
mutable.ListBuffer[(String, String)](outerClassName -> null)
235+
236+
// A map holding the current size in bytes of each class to be generated.
237+
private val classSize: mutable.Map[String, Int] =
238+
mutable.Map[String, Int](outerClassName -> 0)
239+
240+
// Nested maps holding function names and their code belonging to each class.
241+
private val classFunctions: mutable.Map[String, mutable.Map[String, String]] =
242+
mutable.Map(outerClassName -> mutable.Map.empty[String, String])
243+
244+
// Returns the size of the most recently added class.
245+
private def currClassSize(): Int = classSize(classes.head._1)
246+
247+
// Returns the class name and instance name for the most recently added class.
248+
private def currClass(): (String, String) = classes.head
249+
250+
// Adds a new class. Requires the class' name, and its instance name.
251+
private def addClass(className: String, classInstance: String): Unit = {
252+
classes.prepend(className -> classInstance)
253+
classSize += className -> 0
254+
classFunctions += className -> mutable.Map.empty[String, String]
255+
}
256+
257+
/**
258+
* Adds a function to the generated class. If the code for the `OuterClass` grows too large, the
259+
* function will be inlined into a new private, nested class, and a class-qualified name for the
260+
* function will be returned. Otherwise, the function will be inined to the `OuterClass` the
261+
* simple `funcName` will be returned.
262+
*
263+
* @param funcName the class-unqualified name of the function
264+
* @param funcCode the body of the function
265+
* @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This
266+
* can be necessary when a function is declared outside of the context
267+
* it is eventually referenced and a returned qualified function name
268+
* cannot otherwise be accessed.
269+
* @return the name of the function, qualified by class if it will be inlined to a private,
270+
* nested sub-class
271+
*/
272+
def addNewFunction(
273+
funcName: String,
274+
funcCode: String,
275+
inlineToOuterClass: Boolean = false): String = {
276+
// The number of named constants that can exist in the class is limited by the Constant Pool
277+
// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
278+
// threshold of 1600k bytes to determine when a function should be inlined to a private, nested
279+
// sub-class.
280+
val (className, classInstance) = if (inlineToOuterClass) {
281+
outerClassName -> ""
282+
} else if (currClassSize > 1600000) {
283+
val className = freshName("NestedClass")
284+
val classInstance = freshName("nestedClassInstance")
285+
286+
addClass(className, classInstance)
287+
288+
className -> classInstance
289+
} else {
290+
currClass()
291+
}
292+
293+
classSize(className) += funcCode.length
294+
classFunctions(className) += funcName -> funcCode
295+
296+
if (className == outerClassName) {
297+
funcName
298+
} else {
299+
300+
s"$classInstance.$funcName"
301+
}
302+
}
303+
304+
/**
305+
* Instantiates all nested, private sub-classes as objects to the `OuterClass`
306+
*/
307+
private[sql] def initNestedClasses(): String = {
308+
// Nested, private sub-classes have no mutable state (though they do reference the outer class'
309+
// mutable state), so we declare and initialize them inline to the OuterClass.
310+
classes.filter(_._1 != outerClassName).map {
311+
case (className, classInstance) =>
312+
s"private $className $classInstance = new $className();"
313+
}.mkString("\n")
314+
}
315+
316+
/**
317+
* Declares all function code that should be inlined to the `OuterClass`.
318+
*/
319+
private[sql] def declareAddedFunctions(): String = {
320+
classFunctions(outerClassName).values.mkString("\n")
238321
}
239322

323+
/**
324+
* Declares all nested, private sub-classes and the function code that should be inlined to them.
325+
*/
326+
private[sql] def declareNestedClasses(): String = {
327+
classFunctions.filterKeys(_ != outerClassName).map {
328+
case (className, functions) =>
329+
s"""
330+
|private class $className {
331+
| ${functions.values.mkString("\n")}
332+
|}
333+
""".stripMargin
334+
}
335+
}.mkString("\n")
336+
240337
final val JAVA_BOOLEAN = "boolean"
241338
final val JAVA_BYTE = "byte"
242339
final val JAVA_SHORT = "short"
@@ -556,8 +653,7 @@ class CodegenContext {
556653
return 0;
557654
}
558655
"""
559-
addNewFunction(compareFunc, funcCode)
560-
s"this.$compareFunc($c1, $c2)"
656+
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
561657
case schema: StructType =>
562658
val comparisons = GenerateOrdering.genComparisons(this, schema)
563659
val compareFunc = freshName("compareStruct")
@@ -573,8 +669,7 @@ class CodegenContext {
573669
return 0;
574670
}
575671
"""
576-
addNewFunction(compareFunc, funcCode)
577-
s"this.$compareFunc($c1, $c2)"
672+
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
578673
case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
579674
case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
580675
case _ =>
@@ -629,7 +724,9 @@ class CodegenContext {
629724

630725
/**
631726
* Splits the generated code of expressions into multiple functions, because function has
632-
* 64kb code size limit in JVM
727+
* 64kb code size limit in JVM. If the class to which the function would be inlined would grow
728+
* beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it
729+
* instead, because classes have a constant pool limit of 65,536 named values.
633730
*
634731
* @param row the variable name of row that is used by expressions
635732
* @param expressions the codes to evaluate expressions.
@@ -689,7 +786,6 @@ class CodegenContext {
689786
|}
690787
""".stripMargin
691788
addNewFunction(name, code)
692-
name
693789
}
694790

695791
foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})"))
@@ -773,8 +869,6 @@ class CodegenContext {
773869
|}
774870
""".stripMargin
775871

776-
addNewFunction(fnName, fn)
777-
778872
// Add a state and a mapping of the common subexpressions that are associate with this
779873
// state. Adding this expression to subExprEliminationExprMap means it will call `fn`
780874
// when it is code generated. This decision should be a cost based one.
@@ -792,7 +886,7 @@ class CodegenContext {
792886
addMutableState(javaType(expr.dataType), value,
793887
s"$value = ${defaultValue(expr.dataType)};")
794888

795-
subexprFunctions += s"$fnName($INPUT_ROW);"
889+
subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
796890
val state = SubExprEliminationState(isNull, value)
797891
e.foreach(subExprEliminationExprs.put(_, state))
798892
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,21 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
6363
if (e.nullable) {
6464
val isNull = s"isNull_$i"
6565
val value = s"value_$i"
66-
ctx.addMutableState("boolean", isNull, s"this.$isNull = true;")
66+
ctx.addMutableState("boolean", isNull, s"$isNull = true;")
6767
ctx.addMutableState(ctx.javaType(e.dataType), value,
68-
s"this.$value = ${ctx.defaultValue(e.dataType)};")
68+
s"$value = ${ctx.defaultValue(e.dataType)};")
6969
s"""
7070
${ev.code}
71-
this.$isNull = ${ev.isNull};
72-
this.$value = ${ev.value};
71+
$isNull = ${ev.isNull};
72+
$value = ${ev.value};
7373
"""
7474
} else {
7575
val value = s"value_$i"
7676
ctx.addMutableState(ctx.javaType(e.dataType), value,
77-
s"this.$value = ${ctx.defaultValue(e.dataType)};")
77+
s"$value = ${ctx.defaultValue(e.dataType)};")
7878
s"""
7979
${ev.code}
80-
this.$value = ${ev.value};
80+
$value = ${ev.value};
8181
"""
8282
}
8383
}
@@ -87,7 +87,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
8787

8888
val updates = validExpr.zip(index).map {
8989
case (e, i) =>
90-
val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
90+
val ev = ExprCode("", s"isNull_$i", s"value_$i")
9191
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
9292
}
9393

@@ -135,6 +135,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
135135
$allUpdates
136136
return mutableRow;
137137
}
138+
139+
${ctx.initNestedClasses()}
140+
${ctx.declareNestedClasses()}
138141
}
139142
"""
140143

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
179179
$comparisons
180180
return 0;
181181
}
182+
183+
${ctx.initNestedClasses()}
184+
${ctx.declareNestedClasses()}
182185
}"""
183186

184187
val code = CodeFormatter.stripOverlappingComments(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
7272
${eval.code}
7373
return !${eval.isNull} && ${eval.value};
7474
}
75+
76+
${ctx.initNestedClasses()}
77+
${ctx.declareNestedClasses()}
7578
}"""
7679

7780
val code = CodeFormatter.stripOverlappingComments(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
4949
val output = ctx.freshName("safeRow")
5050
val values = ctx.freshName("values")
5151
// These expressions could be split into multiple functions
52-
ctx.addMutableState("Object[]", values, s"this.$values = null;")
52+
ctx.addMutableState("Object[]", values, s"$values = null;")
5353

5454
val rowClass = classOf[GenericInternalRow].getName
5555

@@ -65,10 +65,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
6565
val allFields = ctx.splitExpressions(tmp, fieldWriters)
6666
val code = s"""
6767
final InternalRow $tmp = $input;
68-
this.$values = new Object[${schema.length}];
68+
$values = new Object[${schema.length}];
6969
$allFields
7070
final InternalRow $output = new $rowClass($values);
71-
this.$values = null;
71+
$values = null;
7272
"""
7373

7474
ExprCode(code, "false", output)
@@ -184,6 +184,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
184184
$allExpressions
185185
return mutableRow;
186186
}
187+
188+
${ctx.initNestedClasses()}
189+
${ctx.declareNestedClasses()}
187190
}
188191
"""
189192

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
8282
val rowWriterClass = classOf[UnsafeRowWriter].getName
8383
val rowWriter = ctx.freshName("rowWriter")
8484
ctx.addMutableState(rowWriterClass, rowWriter,
85-
s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
85+
s"$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
8686

8787
val resetWriter = if (isTopLevel) {
8888
// For top level row writer, it always writes to the beginning of the global buffer holder,
@@ -182,7 +182,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
182182
val arrayWriterClass = classOf[UnsafeArrayWriter].getName
183183
val arrayWriter = ctx.freshName("arrayWriter")
184184
ctx.addMutableState(arrayWriterClass, arrayWriter,
185-
s"this.$arrayWriter = new $arrayWriterClass();")
185+
s"$arrayWriter = new $arrayWriterClass();")
186186
val numElements = ctx.freshName("numElements")
187187
val index = ctx.freshName("index")
188188
val element = ctx.freshName("element")
@@ -321,7 +321,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
321321
val holder = ctx.freshName("holder")
322322
val holderClass = classOf[BufferHolder].getName
323323
ctx.addMutableState(holderClass, holder,
324-
s"this.$holder = new $holderClass($result, ${numVarLenFields * 32});")
324+
s"$holder = new $holderClass($result, ${numVarLenFields * 32});")
325325

326326
val resetBufferHolder = if (numVarLenFields == 0) {
327327
""
@@ -402,6 +402,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
402402
${eval.code.trim}
403403
return ${eval.value};
404404
}
405+
406+
${ctx.initNestedClasses()}
407+
${ctx.declareNestedClasses()}
405408
}
406409
"""
407410

0 commit comments

Comments
 (0)