Skip to content

Conversation

@chenghao-intel
Copy link
Contributor

To enable the unit test of hadoopFsRelationSuite.Partition column type casting. It previously threw exception like below, as we treat the auto infer partition schema with higher priority than the user specified one.

java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
    at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
07:44:01.344 ERROR org.apache.spark.executor.Executor: Exception in task 14.0 in stage 3.0 (TID 206)
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
    at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

@chenghao-intel
Copy link
Contributor Author

cc @liancheng

@liancheng
Copy link
Contributor

A summary of my offline discussion with @chenghao-intel:

The real problem here is that the partition column types of the newly refreshed partition spec don't match those in the user specified spec. The current fix simply disables refreshing partition spec, which is not preferable. My suggestion is to factor out the partition values casting part in the partitionSpec method and reuse it in refresh() to cast data types of partition values and just reuse partitionColumns in the user specified partition spec.

@SparkQA
Copy link

SparkQA commented Aug 19, 2015

Test build #41211 has finished for PR 8026 at commit cda059f.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@chenghao-intel
Copy link
Contributor Author

retest this please

1 similar comment
@chenghao-intel
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 19, 2015

Test build #41202 has finished for PR 8026 at commit f68d827.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2015

Test build #41214 has finished for PR 8026 at commit cda059f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, true, this is not valid any more.

@JoshRosen
Copy link
Contributor

@chenghao-intel, just to clarify: I noticed that your final approach involved pushing an expected data type down into the method named inferPartitionColumValue. I'm curious why you chose this approach as opposed to re-using the code that @liancheng pointed to upthread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my other comment upthread, this is a bit confusing to me: this method is named infer, but has a mode where it won't perform inference (controlled by a boolean flag), and now has another new field which also bypasses inference and performs a cast. This is confusing to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the master branch, if typeInference == false, it means the data type of partition key will be StringType by default, otherwise, it's probably will be IntegerType, LongType etc. depends on the real value the partition key is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass the expect the data type down and then get the associated literal-based partition column value; and @liancheng's suggestion kind of like get the literal (maybe string based) first, and then do casting outside, however, this probably lose some data precision during the re-casting.

For example:
The path looks like, .../part1=1.000, and with the auto inference, we will get a Double for the partition column value, and it will be cast to string as 1.0 if what user expect is StringType;

However, this is totally different if we get it as StringType directly, which supposed to be 1.000.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that casting to a non string type and then converting back to a string may lose precision, but what about disabling inference when calling inferPartitionColumnValue if the user has provided a schema? In that case, it should end up just returning the string literals, which you can then cast without a loss of precision.

Sent from my phone

On Oct 20, 2015, at 8:00 AM, Cheng Hao [email protected] wrote:

In sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala:

*/

private[sql] def inferPartitionColumnValue(

  •  expectedDT: Option[DataType],
    
    We need to pass the expect the data type down and then get the associated literal-based partition column value; and @liancheng's suggestion kind of like get the literal (maybe string based) first, and then do casting outside, however, this probably lose some data precision during the re-casting.

For example:
The path looks like, /part1=1.000, and with the auto inference, we will get a Double, and it will be cast to string as 1.0 if what user expect is StringType;

However, this is totally different if we get it as StringType directly, which supposed to be 1.000.


Reply to this email directly or view it on GitHub.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, I will update the code.

@JoshRosen
Copy link
Contributor

I just tried testing a build where I only re-enabled the ignored test and changed nothing else. In this case, the test still passed. This makes me wonder whether the "Partition column type casting" is an adequate regression test for this issue.

Can you write a new test for this which fails without this patch?

@chenghao-intel
Copy link
Contributor Author

Thank you @JoshRosen , I will pick up this PR as some details I almost forgot. But definitely, the ignored test cases will fail without this PR previously, not sure if someone else fixed that in some other place.

@chenghao-intel
Copy link
Contributor Author

@JoshRosen I've updated the unit test also by adding an Append operation, without this PR, it will throws exception as I described in the jira (https://issues.apache.org/jira/browse/SPARK-9735).

The root reason that the previous unit test can even passed, should be solved #8035, as it will always get the latest schema from the user specified without calling the relation.refresh(), however relation.refresh() will be called indirectly in Append mode.

@SparkQA
Copy link

SparkQA commented Oct 20, 2015

Test build #43984 has finished for PR 8026 at commit 7f2da8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is AssertionError the right exception to be throwing here? I'd think that IllegalArgumentException might be more appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll agree we need to take the partition path validation into a separate PR, since we definitely can do more checking and also more pretty error message.

@SparkQA
Copy link

SparkQA commented Oct 21, 2015

Test build #44038 has finished for PR 8026 at commit 2cc93da.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@chenghao-intel
Copy link
Contributor Author

@JoshRosen I've updated the code, should be more straightforward and clean

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super-minor nit: could you explicitly name the boolean parameter here at the call-site, e.g. inferSchema = false? This is one of IntelliJ's automatic style recommendations and I'm a fan of it because it makes the code a bit easier to read. I might also just change this myself on merge.

@SparkQA
Copy link

SparkQA commented Oct 21, 2015

Test build #44040 has finished for PR 8026 at commit 9f08f76.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: .map instead of using infix notation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wording of this error message might be slightly confusing to users since this branch is explicitly disabling inference. I think that it might be slightly clearer to say something like "Actual partitioning column names did not match user-specified partitioning schema; expected ... but got ...", since as far as I know the inference is really only done for the types of the columns, not their names.

@JoshRosen
Copy link
Contributor

@chenghao-intel, thanks a bunch for updating this; the current version of this patch is a lot easier to understand and I'm happy with how clean the code turned out. I left only minor style / clarity comments, which I don't mind addressing myself on merge if you're too busy. If you don't mind, though, one more round of quick updates to address my comments would be appreciated.

Anyhow, the technical changes here LGTM.

@chenghao-intel
Copy link
Contributor Author

Thank you @JoshRosen so much for the detail review, but seems bug exists, I'd like to solve it myself soon.

@SparkQA
Copy link

SparkQA commented Oct 21, 2015

Test build #44066 has finished for PR 8026 at commit bdee89e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

@chenghao-intel, it looks like this most recent test failure is legitimate:

assertion failed: Actual partitioning column names did not match user-specified partitioning schema; expect StructType(StructField(part,IntegerType,true)), but got StructType()}

@chenghao-intel
Copy link
Contributor Author

Yes, true, actually SPARK-7749 provides an example of Hive metastore backend empty partition table, then we will not detect any partition column values.

I simply removed the assertion in the code, as it's not valid in this case.

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44113 has finished for PR 8026 at commit 3383473.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

LGTM, so I'm going to merge this into master. Should this be backported to 1.5.x or any earlier releases?

@asfgit asfgit closed this in d4950e6 Oct 22, 2015
@adamjk
Copy link

adamjk commented Dec 15, 2015

Is this being backported to 1.5.x?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants