From 957bf13a177f2b4ed94a0804c64aace4ecb5a779 Mon Sep 17 00:00:00 2001 From: hustfeiwang Date: Thu, 23 May 2019 17:10:09 +0800 Subject: [PATCH 1/2] [SPARK-27814] The cast operation may push down uncorrect filter, which is fatal. --- .../apache/spark/sql/hive/client/HiveShim.scala | 17 ++++++++++++++--- .../sql/hive/execution/SQLQuerySuite.scala | 9 +++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 18f8c53609812..467c0e0e57980 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -675,12 +675,23 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled object ExtractAttribute { + val partitionKeys = table.getPartitionKeys.asScala.map(_.getName).toSet + var castToStr = false + def unapply(expr: Expression): Option[Attribute] = { expr match { - case attr: Attribute => Some(attr) + case attr: Attribute + if (!castToStr || !partitionKeys.contains(attr.name) || + attr.dataType == StringType) => + castToStr = false + Some(attr) case Cast(child @ AtomicType(), dt: AtomicType, _) - if Cast.canUpCast(child.dataType.asInstanceOf[AtomicType], dt) => unapply(child) - case _ => None + if Cast.canUpCast(child.dataType.asInstanceOf[AtomicType], dt) => + castToStr = (castToStr || dt == StringType) + unapply(child) + case _ => + castToStr = false + None } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 29de55f7040f1..d7abc3f0e9e0c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2384,4 +2384,13 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("SPARK-27814: test cast operation for partition key") { + withTable("t1") { + sql("CREATE TABLE t1(c1 INT, c2 STRING) PARTITIONED BY (p1 INT)") + sql("INSERT INTO TABLE t1 PARTITION (p1 = 5) values(1, 'str')") + checkAnswer(sql("SELECT c1 FROM t1 WHERE CAST(p1 as STRING) = '5'"), Row(1)) + checkAnswer(sql("SELECT c1 FROM t1 WHERE CAST( CAST(p1 AS BIGINT) AS STRING) = '5'"), Row(1)) + } + } } From 19b9f6b449e306a9432c7a46812103e89cf4ba7e Mon Sep 17 00:00:00 2001 From: hustfeiwang Date: Mon, 27 May 2019 16:58:55 +0800 Subject: [PATCH 2/2] fix code --- .../scala/org/apache/spark/sql/hive/client/HiveShim.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 467c0e0e57980..2362a34c09c44 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -675,14 +675,12 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled object ExtractAttribute { - val partitionKeys = table.getPartitionKeys.asScala.map(_.getName).toSet var castToStr = false def unapply(expr: Expression): Option[Attribute] = { expr match { case attr: Attribute - if (!castToStr || !partitionKeys.contains(attr.name) || - attr.dataType == StringType) => + if (!castToStr || attr.dataType == StringType) => castToStr = false Some(attr) case Cast(child @ AtomicType(), dt: AtomicType, _)