Skip to content

Conversation

@mn-mikke
Copy link
Contributor

What changes were proposed in this pull request?

This PR is proposing a fix for the output data type of If and CaseWhen expression. Upon till now, the implementation of exprassions has ignored nullability of nested types from different execution branches and returned the type of the first branch.

This could lead to an unwanted NullPointerException from other expressions depending on a If/CaseWhen expression.

Example:

val rows = new util.ArrayList[Row]()
rows.add(Row(true, ("a", 1)))
rows.add(Row(false, (null, 2)))
val schema = StructType(Seq(
  StructField("cond", BooleanType, false),
  StructField("s", StructType(Seq(
    StructField("val1", StringType, true),
    StructField("val2", IntegerType, false)
  )), false)
))

val df = spark.createDataFrame(rows, schema)

df
  .select(when('cond, struct(lit("x").as("val1"), lit(10).as("val2"))).otherwise('s) as "res")
  .select('res.getField("val1"))
  .show()

Exception:

Exception in thread "main" java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
	at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
...

Output schema:

root
 |-- res.val1: string (nullable = false)

How was this patch tested?

New test cases added into

  • DataFrameSuite.scala
  • conditionalExpressions.scala

@mn-mikke
Copy link
Contributor Author

cc @cloud-fan @ueshin @viirya @maropu

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92856 has finished for PR 21747 at commit a2fe63e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ComplexTypeMergingExpression extends Expression

@mn-mikke
Copy link
Contributor Author

retest this please

* data types of all child expressions. The collection must not be empty.
*/
@transient
lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType)
Copy link
Member

@maropu maropu Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: @transient lazy val

StructType(newFields)
}

override def dataType: DataType = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we change def into lazy val? I feel a little weird that the two requirement checks are invoked every dataType called.

extends ComplexTypeMergingExpression {

@transient
override lazy val inputTypesForMerging: Seq[DataType] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: @transient lazy val

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92859 has finished for PR 21747 at commit a2fe63e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ComplexTypeMergingExpression extends Expression

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92862 has finished for PR 21747 at commit a2fe63e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ComplexTypeMergingExpression extends Expression

@cloud-fan
Copy link
Contributor

retest this please

1 similar comment
@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92912 has finished for PR 21747 at commit a2fe63e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ComplexTypeMergingExpression extends Expression

@maropu
Copy link
Member

maropu commented Jul 12, 2018

retest this please

@mn-mikke
Copy link
Contributor Author

Thanks guys for triggering the builds, but I must implement a fix first. It's failing since the code is accessing SQLConf from executors (doCodeGen-> dateType->sameType->SQLConf). For master, it shouldn't be a problem thanks to #21376. But for this branch, we need to come up with some work around. Maybe, via enforcing serialization of dataType?

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92914 has finished for PR 21747 at commit a2fe63e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ComplexTypeMergingExpression extends Expression

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92925 has finished for PR 21747 at commit 163b880.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

ah, that's hard to fix, maybe just leave it, since Spark 2.4 is coming.

@maropu
Copy link
Member

maropu commented Jul 12, 2018

How about making a temporary helper function for sameType in ComplexTypeMergingExpression? You read the config for case sensitivity in a driver side, and hold it as a field value in the closure. Then, the helper function references the (serialized and deserialized) value in executor sides?
NVM, just me talking to myself...

@mn-mikke
Copy link
Contributor Author

IMHO, we could directly use DataType.equalsIgnoreCaseAndNullability instead of sameType to make sure that mergeTwoDataTypes won't blow up if the trait is misused. The proper check is performed by checkInputDataTypes anyway.

If you don't like this approach for any reason, I'm happy to close the PR. :-)

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92933 has finished for PR 21747 at commit 4b42c78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@dongjoon-hyun
Copy link
Member

Hi, @mn-mikke and @cloud-fan and @maropu .
2.3.2 vote passed today and 2.4.0-rc1 doesn't have this issue. Given that Spark 2.4.0 will come faster than Spark 2.3.3, are we heading to (1) or (2)?

  1. Mark this as resolved in 2.4.0 and close this PR?
  2. Proceed this PR for 2.3.3?

@mn-mikke
Copy link
Contributor Author

If nobody has any objections, I'm happy to close this PR.

@cloud-fan
Copy link
Contributor

To backport this fix, we need to backport another improvement PR that allows accessing SQLConf at executor side, which violates the backport rule. I think it's ok to have this fix in 2.4 only.

@mn-mikke mn-mikke closed this Sep 24, 2018
@dongjoon-hyun
Copy link
Member

Thank you for the decision, @mn-mikke and @cloud-fan !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants