Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,36 @@ class CodegenContext {
}
}

/**
* Wrap the generated code of expression, which was created from a row object in INPUT_ROW,
* by a function. ev.isNull and ev.value are passed by global variables
*
* @param ev the code to evaluate expressions.
* @param dataType the data type of ev.value.
* @param baseFuncName the split function name base.
*/
def createAndAddFunction(
ev: ExprCode,
dataType: DataType,
baseFuncName: String): (String, String, String) = {
val globalIsNull = freshName("isNull")
addMutableState("boolean", globalIsNull, s"$globalIsNull = false;")
val globalValue = freshName("value")
addMutableState(javaType(dataType), globalValue,
s"$globalValue = ${defaultValue(dataType)};")
val funcName = freshName(baseFuncName)
val funcBody =
s"""
|private void $funcName(InternalRow ${INPUT_ROW}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it work with whole stage codegen? the input is not InternalRow but some variable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works only if ctx.currentVars == null.
We will follow to support the whole stage codegen as follow-up in other PRs.

| ${ev.code.trim}
| $globalIsNull = ${ev.isNull};
| $globalValue = ${ev.value};
|}
""".stripMargin
val fullFuncName = addNewFunction(funcName, funcBody)
(fullFuncName, globalIsNull, globalValue)
}

/**
* Perform a function which generates a sequence of ExprCodes with a given mapping between
* expressions and common expressions, instead of using the mapping in current context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (condFuncName, condGlobalIsNull, condGlobalValue) =
createAndAddFunction(ctx, condEval, predicate.dataType, "evalIfCondExpr")
ctx.createAndAddFunction(condEval, predicate.dataType, "evalIfCondExpr")
val (trueFuncName, trueGlobalIsNull, trueGlobalValue) =
createAndAddFunction(ctx, trueEval, trueValue.dataType, "evalIfTrueExpr")
ctx.createAndAddFunction(trueEval, trueValue.dataType, "evalIfTrueExpr")
val (falseFuncName, falseGlobalIsNull, falseGlobalValue) =
createAndAddFunction(ctx, falseEval, falseValue.dataType, "evalIfFalseExpr")
ctx.createAndAddFunction(falseEval, falseValue.dataType, "evalIfFalseExpr")
s"""
$condFuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
Expand Down Expand Up @@ -112,29 +112,6 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
ev.copy(code = generatedCode)
}

private def createAndAddFunction(
ctx: CodegenContext,
ev: ExprCode,
dataType: DataType,
baseFuncName: String): (String, String, String) = {
val globalIsNull = ctx.freshName("isNull")
ctx.addMutableState("boolean", globalIsNull, s"$globalIsNull = false;")
val globalValue = ctx.freshName("value")
ctx.addMutableState(ctx.javaType(dataType), globalValue,
s"$globalValue = ${ctx.defaultValue(dataType)};")
val funcName = ctx.freshName(baseFuncName)
val funcBody =
s"""
|private void $funcName(InternalRow ${ctx.INPUT_ROW}) {
| ${ev.code.trim}
| $globalIsNull = ${ev.isNull};
| $globalValue = ${ev.value};
|}
""".stripMargin
val fullFuncName = ctx.addNewFunction(funcName, funcBody)
(fullFuncName, globalIsNull, globalValue)
}

override def toString: String = s"if ($predicate) $trueValue else $falseValue"

override def sql: String = s"(IF(${predicate.sql}, ${trueValue.sql}, ${falseValue.sql}))"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,46 @@ case class And(left: Expression, right: Expression) extends BinaryOperator with
val eval2 = right.genCode(ctx)

// The result should be `false`, if any of them is `false` whenever the other is null or not.
if (!left.nullable && !right.nullable) {

// place generated code of eval1 and eval2 in separate methods if their code combined is large
val combinedLength = eval1.code.length + eval2.code.length
if (combinedLength > 1024 &&
// Split these expressions only if they are created from a row object
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) =
ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr")
val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) =
ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr")
if (!left.nullable && !right.nullable) {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.value} = false;
if (${eval1GlobalValue}) {
$eval2FuncName(${ctx.INPUT_ROW});
${ev.value} = ${eval2GlobalValue};
}
"""
ev.copy(code = generatedCode, isNull = "false")
} else {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
boolean ${ev.value} = false;
if (!${eval1GlobalIsNull} && !${eval1GlobalValue}) {
} else {
$eval2FuncName(${ctx.INPUT_ROW});
if (!${eval2GlobalIsNull} && !${eval2GlobalValue}) {
} else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) {
${ev.value} = true;
} else {
${ev.isNull} = true;
}
}
"""
ev.copy(code = generatedCode)
}
} else if (!left.nullable && !right.nullable) {
ev.copy(code = s"""
${eval1.code}
boolean ${ev.value} = false;
Expand Down Expand Up @@ -431,7 +470,46 @@ case class Or(left: Expression, right: Expression) extends BinaryOperator with P
val eval2 = right.genCode(ctx)

// The result should be `true`, if any of them is `true` whenever the other is null or not.
if (!left.nullable && !right.nullable) {

// place generated code of eval1 and eval2 in separate methods if their code combined is large
val combinedLength = eval1.code.length + eval2.code.length
if (combinedLength > 1024 &&
// Split these expressions only if they are created from a row object
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) =
ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr")
val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) =
ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr")
if (!left.nullable && !right.nullable) {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.value} = true;
if (!${eval1GlobalValue}) {
$eval2FuncName(${ctx.INPUT_ROW});
${ev.value} = ${eval2GlobalValue};
}
"""
ev.copy(code = generatedCode, isNull = "false")
} else {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
boolean ${ev.value} = true;
if (!${eval1GlobalIsNull} && ${eval1GlobalValue}) {
} else {
$eval2FuncName(${ctx.INPUT_ROW});
if (!${eval2GlobalIsNull} && ${eval2GlobalValue}) {
} else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) {
${ev.value} = false;
} else {
${ev.isNull} = true;
}
}
"""
ev.copy(code = generatedCode)
}
} else if (!left.nullable && !right.nullable) {
ev.isNull = "false"
ev.copy(code = s"""
${eval1.code}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,43 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
// should not throw exception
projection(row)
}

test("SPARK-21720: split large predications into blocks due to JVM code size limit") {
val length = 600

val input = new GenericInternalRow(length)
val utf8Str = UTF8String.fromString(s"abc")
for (i <- 0 until length) {
input.update(i, utf8Str)
}

var exprOr: Expression = Literal(false)
for (i <- 0 until length) {
exprOr = Or(EqualTo(BoundReference(i, StringType, true), Literal(s"c$i")), exprOr)
}

val planOr = GenerateMutableProjection.generate(Seq(exprOr))
val actualOr = planOr(input).toSeq(Seq(exprOr.dataType))
assert(actualOr.length == 1)
val expectedOr = false

if (!checkResult(actualOr.head, expectedOr, exprOr.dataType)) {
fail(s"Incorrect Evaluation: expressions: $exprOr, actual: $actualOr, expected: $expectedOr")
}

var exprAnd: Expression = Literal(true)
for (i <- 0 until length) {
exprAnd = And(EqualTo(BoundReference(i, StringType, true), Literal(s"c$i")), exprAnd)
}

val planAnd = GenerateMutableProjection.generate(Seq(exprAnd))
val actualAnd = planAnd(input).toSeq(Seq(exprAnd.dataType))
assert(actualAnd.length == 1)
val expectedAnd = false

if (!checkResult(actualAnd.head, expectedAnd, exprAnd.dataType)) {
fail(
s"Incorrect Evaluation: expressions: $exprAnd, actual: $actualAnd, expected: $expectedAnd")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,11 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.count
}

testQuietly("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") {
// The fix of SPARK-21720 avoid an exception regarding JVM code size limit
// TODO: When we make a threshold of splitting statements (1024) configurable,
// we will re-enable this with max threshold to cause an exception
// See https://github.com/apache/spark/pull/18972/files#r150223463
ignore("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") {
val N = 400
val rows = Seq(Row.fromSeq(Seq.fill(N)("string")))
val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType)))
Expand Down