Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ case class ScalaUDF(
val converterTerm = ctx.freshName("converter")
val expressionIdx = ctx.references.size - 1
ctx.addMutableState(converterClassName, converterTerm,
s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
s"$converterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
s"references[$expressionIdx]).getChildren().apply($index))).dataType());")
converterTerm
Expand All @@ -1005,7 +1005,7 @@ case class ScalaUDF(
// Generate codes used to convert the returned value of user-defined functions to Catalyst type
val catalystConverterTerm = ctx.freshName("catalystConverter")
ctx.addMutableState(converterClassName, catalystConverterTerm,
s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
s"$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToCatalystConverter($scalaUDF.dataType());")

val resultTerm = ctx.freshName("result")
Expand All @@ -1019,7 +1019,7 @@ case class ScalaUDF(

val funcTerm = ctx.freshName("udf")
ctx.addMutableState(funcClassName, funcTerm,
s"this.$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
s"$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")

// codegen for children expressions
val evals = children.map(_.genCode(ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class CodegenContext {
val idx = references.length
references += obj
val clsName = Option(className).getOrElse(obj.getClass.getName)
addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
addMutableState(clsName, term, s"$term = ($clsName) references[$idx];")
term
}

Expand Down Expand Up @@ -198,41 +198,139 @@ class CodegenContext {
partitionInitializationStatements.mkString("\n")
}

/**
* Holds expressions that are equivalent. Used to perform subexpression elimination
* during codegen.
*
* For expressions that appear more than once, generate additional code to prevent
* recomputing the value.
*
* For example, consider two expression generated from this SQL statement:
* SELECT (col1 + col2), (col1 + col2) / col3.
*
* equivalentExpressions will match the tree containing `col1 + col2` and it will only
* be evaluated once.
*/
val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions

// Foreach expression that is participating in subexpression elimination, the state to use.
val subExprEliminationExprs = mutable.HashMap.empty[Expression, SubExprEliminationState]

// The collection of sub-expression result resetting methods that need to be called on each row.
val subexprFunctions = mutable.ArrayBuffer.empty[String]

private val outerClassName = "OuterClass"

/**
* Holding all the functions those will be added into generated class.
* Holds the class and instance names to be generated, where `OuterClass` is a placeholder
* standing for whichever class is generated as the outermost class and which will contain any
* nested sub-classes. All other classes and instance names in this list will represent private,
* nested sub-classes.
*/
val addedFunctions: mutable.Map[String, String] =
mutable.Map.empty[String, String]
private val classes: mutable.ListBuffer[(String, String)] =
mutable.ListBuffer[(String, String)](outerClassName -> null)

// A map holding the current size in bytes of each class to be generated.
private val classSize: mutable.Map[String, Int] =
mutable.Map[String, Int](outerClassName -> 0)

// Nested maps holding function names and their code belonging to each class.
private val classFunctions: mutable.Map[String, mutable.Map[String, String]] =
mutable.Map(outerClassName -> mutable.Map.empty[String, String])

def addNewFunction(funcName: String, funcCode: String): Unit = {
addedFunctions += ((funcName, funcCode))
// Returns the size of the most recently added class.
private def currClassSize(): Int = classSize(classes.head._1)

// Returns the class name and instance name for the most recently added class.
private def currClass(): (String, String) = classes.head

// Adds a new class. Requires the class' name, and its instance name.
private def addClass(className: String, classInstance: String): Unit = {
classes.prepend(className -> classInstance)
classSize += className -> 0
classFunctions += className -> mutable.Map.empty[String, String]
}

/**
* Holds expressions that are equivalent. Used to perform subexpression elimination
* during codegen.
*
* For expressions that appear more than once, generate additional code to prevent
* recomputing the value.
* Adds a function to the generated class. If the code for the `OuterClass` grows too large, the
* function will be inlined into a new private, nested class, and a instance-qualified name for
* the function will be returned. Otherwise, the function will be inlined to the `OuterClass` the
* simple `funcName` will be returned.
*
* For example, consider two expression generated from this SQL statement:
* SELECT (col1 + col2), (col1 + col2) / col3.
*
* equivalentExpressions will match the tree containing `col1 + col2` and it will only
* be evaluated once.
* @param funcName the class-unqualified name of the function
* @param funcCode the body of the function
* @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This
* can be necessary when a function is declared outside of the context
* it is eventually referenced and a returned qualified function name
* cannot otherwise be accessed.
* @return the name of the function, qualified by class if it will be inlined to a private,
* nested sub-class
*/
val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions
def addNewFunction(
funcName: String,
funcCode: String,
inlineToOuterClass: Boolean = false): String = {
// The number of named constants that can exist in the class is limited by the Constant Pool
// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
// threshold of 1600k bytes to determine when a function should be inlined to a private, nested
// sub-class.
val (className, classInstance) = if (inlineToOuterClass) {
outerClassName -> ""
} else if (currClassSize > 1600000) {
val className = freshName("NestedClass")
val classInstance = freshName("nestedClassInstance")

addClass(className, classInstance)

className -> classInstance
} else {
currClass()
}

// Foreach expression that is participating in subexpression elimination, the state to use.
val subExprEliminationExprs = mutable.HashMap.empty[Expression, SubExprEliminationState]
classSize(className) += funcCode.length
classFunctions(className) += funcName -> funcCode

// The collection of sub-expression result resetting methods that need to be called on each row.
val subexprFunctions = mutable.ArrayBuffer.empty[String]
if (className == outerClassName) {
funcName
} else {

def declareAddedFunctions(): String = {
addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n")
s"$classInstance.$funcName"
}
}

/**
* Instantiates all nested, private sub-classes as objects to the `OuterClass`
*/
private[sql] def initNestedClasses(): String = {
// Nested, private sub-classes have no mutable state (though they do reference the outer class'
// mutable state), so we declare and initialize them inline to the OuterClass.
classes.filter(_._1 != outerClassName).map {
case (className, classInstance) =>
s"private $className $classInstance = new $className();"
}.mkString("\n")
}

/**
* Declares all function code that should be inlined to the `OuterClass`.
*/
private[sql] def declareAddedFunctions(): String = {
classFunctions(outerClassName).values.mkString("\n")
}

/**
* Declares all nested, private sub-classes and the function code that should be inlined to them.
*/
private[sql] def declareNestedClasses(): String = {
classFunctions.filterKeys(_ != outerClassName).map {
case (className, functions) =>
s"""
|private class $className {
| ${functions.values.mkString("\n")}
|}
""".stripMargin
}
}.mkString("\n")

final val JAVA_BOOLEAN = "boolean"
final val JAVA_BYTE = "byte"
final val JAVA_SHORT = "short"
Expand Down Expand Up @@ -552,8 +650,7 @@ class CodegenContext {
return 0;
}
"""
addNewFunction(compareFunc, funcCode)
s"this.$compareFunc($c1, $c2)"
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
case schema: StructType =>
val comparisons = GenerateOrdering.genComparisons(this, schema)
val compareFunc = freshName("compareStruct")
Expand All @@ -569,8 +666,7 @@ class CodegenContext {
return 0;
}
"""
addNewFunction(compareFunc, funcCode)
s"this.$compareFunc($c1, $c2)"
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
case _ =>
Expand Down Expand Up @@ -640,7 +736,9 @@ class CodegenContext {

/**
* Splits the generated code of expressions into multiple functions, because function has
* 64kb code size limit in JVM
* 64kb code size limit in JVM. If the class to which the function would be inlined would grow
* beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it
* instead, because classes have a constant pool limit of 65,536 named values.
*
* @param expressions the codes to evaluate expressions.
* @param funcName the split function name base.
Expand Down Expand Up @@ -685,7 +783,6 @@ class CodegenContext {
|}
""".stripMargin
addNewFunction(name, code)
name
}

foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})"))
Expand Down Expand Up @@ -769,8 +866,6 @@ class CodegenContext {
|}
""".stripMargin

addNewFunction(fnName, fn)

// Add a state and a mapping of the common subexpressions that are associate with this
// state. Adding this expression to subExprEliminationExprMap means it will call `fn`
// when it is code generated. This decision should be a cost based one.
Expand All @@ -791,7 +886,7 @@ class CodegenContext {
addMutableState(javaType(expr.dataType), value,
s"$value = ${defaultValue(expr.dataType)};")

subexprFunctions += s"$fnName($INPUT_ROW);"
subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
val state = SubExprEliminationState(isNull, value)
e.foreach(subExprEliminationExprs.put(_, state))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,21 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
if (e.nullable) {
val isNull = s"isNull_$i"
val value = s"value_$i"
ctx.addMutableState("boolean", isNull, s"this.$isNull = true;")
ctx.addMutableState("boolean", isNull, s"$isNull = true;")
ctx.addMutableState(ctx.javaType(e.dataType), value,
s"this.$value = ${ctx.defaultValue(e.dataType)};")
s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
this.$isNull = ${ev.isNull};
this.$value = ${ev.value};
$isNull = ${ev.isNull};
$value = ${ev.value};
"""
} else {
val value = s"value_$i"
ctx.addMutableState(ctx.javaType(e.dataType), value,
s"this.$value = ${ctx.defaultValue(e.dataType)};")
s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
this.$value = ${ev.value};
$value = ${ev.value};
"""
}
}
Expand All @@ -87,7 +87,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP

val updates = validExpr.zip(index).map {
case (e, i) =>
val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
val ev = ExprCode("", s"isNull_$i", s"value_$i")
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
}

Expand Down Expand Up @@ -135,6 +135,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
$allUpdates
return mutableRow;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
$comparisons
return 0;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}"""

val code = CodeFormatter.stripOverlappingComments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
${eval.code}
return !${eval.isNull} && ${eval.value};
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}"""

val code = CodeFormatter.stripOverlappingComments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
val output = ctx.freshName("safeRow")
val values = ctx.freshName("values")
// These expressions could be split into multiple functions
ctx.addMutableState("Object[]", values, s"this.$values = null;")
ctx.addMutableState("Object[]", values, s"$values = null;")

val rowClass = classOf[GenericInternalRow].getName

Expand All @@ -65,10 +65,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
val allFields = ctx.splitExpressions(tmp, fieldWriters)
val code = s"""
final InternalRow $tmp = $input;
this.$values = new Object[${schema.length}];
$values = new Object[${schema.length}];
$allFields
final InternalRow $output = new $rowClass($values);
this.$values = null;
$values = null;
"""

ExprCode(code, "false", output)
Expand Down Expand Up @@ -184,6 +184,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
$allExpressions
return mutableRow;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}
"""

Expand Down
Loading