From 9595237a3178de7fb4c8e280a55072ed050fffc3 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 15 May 2018 14:13:39 +0200 Subject: [PATCH 1/6] [SPARK-24276][SQL] Order of literals in IN should not affect semantic equality --- .../sql/catalyst/expressions/Canonicalize.scala | 10 ++++++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 16 ++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index d848ba18356d3..03645747400e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.util.TypeUtils + /** * Rewrites an expression using rules that are guaranteed preserve the result while attempting * to remove cosmetic variations. Deterministic expressions that are `equal` after canonicalization @@ -85,6 +87,14 @@ object Canonicalize { case Not(GreaterThanOrEqual(l, r)) => LessThan(l, r) case Not(LessThanOrEqual(l, r)) => GreaterThan(l, r) + // order the list in the In operator + // we can do this only if all the elements in the list are literals with the same datatype + case i @ In(value, list) + if i.inSetConvertible && list.map(_.dataType.asNullable).distinct.size == 1 => + val literals = list.map(_.asInstanceOf[Literal]) + val ordering = TypeUtils.getInterpretedOrdering(literals.head.dataType) + In(value, literals.sortBy(_.value)(ordering)) + case _ => e } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 60e84e6ee7504..e35ba7d8115be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2265,4 +2265,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = spark.range(1).select($"id", new Column(Uuid())) checkAnswer(df, df.collect()) } + + test("SPARK-24276: IN returns sameResult if the order of literals is different") { + val df = spark.range(1) + val p1 = df.where($"id".isin(1, 2)) + val p2 = df.where($"id".isin(2, 1)) + val p3 = df.where($"id".isin(1, 2, 3)) + + assert(p1.queryExecution.executedPlan.sameResult(p2.queryExecution.executedPlan)) + assert(!p1.queryExecution.executedPlan.sameResult(p3.queryExecution.executedPlan)) + + val h1 = p1.queryExecution.logical.canonicalized.semanticHash() + val h2 = p2.queryExecution.logical.canonicalized.semanticHash() + val h3 = p3.queryExecution.logical.canonicalized.semanticHash() + assert(h1 == h2) + assert(h1 != h3) + } } From 6cd34d9322dc8025df3cb0fe5cf596de09ad10eb Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 20 May 2018 14:45:17 +0200 Subject: [PATCH 2/6] add tests for arrays --- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e35ba7d8115be..b3427a203358d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2280,5 +2280,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val h3 = p3.queryExecution.logical.canonicalized.semanticHash() assert(h1 == h2) assert(h1 != h3) + + val df2 = Seq(Array(1, 2)).toDF("id") + val arrays1 = df2.where($"id".isin(lit(Array(1, 2)), lit(Array(2, 1)))) + val arrays2 = df2.where($"id".isin(lit(Array(2, 1)), lit(Array(1, 2)))) + val arrays3 = df2.where($"id".isin(lit(Array(3, 2)), lit(Array(2, 1)))) + assert(arrays1.queryExecution.executedPlan.sameResult(arrays2.queryExecution.executedPlan)) + assert(!arrays1.queryExecution.executedPlan.sameResult(arrays3.queryExecution.executedPlan)) + val arraysHash1 = arrays1.queryExecution.logical.canonicalized.semanticHash() + val arraysHash2 = arrays2.queryExecution.logical.canonicalized.semanticHash() + val arraysHash3 = arrays3.queryExecution.logical.canonicalized.semanticHash() + assert(arraysHash1 == arraysHash2) + assert(arraysHash1 != arraysHash3) } } From ccbdd11a1f2ff6f08db47694f315109b61c8726e Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 21 May 2018 10:33:18 +0200 Subject: [PATCH 3/6] order by semanticHash --- .../spark/sql/catalyst/expressions/Canonicalize.scala | 8 ++------ .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index 03645747400e6..7549d0de407b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -88,12 +88,8 @@ object Canonicalize { case Not(LessThanOrEqual(l, r)) => GreaterThan(l, r) // order the list in the In operator - // we can do this only if all the elements in the list are literals with the same datatype - case i @ In(value, list) - if i.inSetConvertible && list.map(_.dataType.asNullable).distinct.size == 1 => - val literals = list.map(_.asInstanceOf[Literal]) - val ordering = TypeUtils.getInterpretedOrdering(literals.head.dataType) - In(value, literals.sortBy(_.value)(ordering)) + case In(value, list) => + In(value, list.sortBy(_.semanticHash())) case _ => e } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b3427a203358d..f28e44a2493d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2281,10 +2281,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(h1 == h2) assert(h1 != h3) - val df2 = Seq(Array(1, 2)).toDF("id") - val arrays1 = df2.where($"id".isin(lit(Array(1, 2)), lit(Array(2, 1)))) - val arrays2 = df2.where($"id".isin(lit(Array(2, 1)), lit(Array(1, 2)))) - val arrays3 = df2.where($"id".isin(lit(Array(3, 2)), lit(Array(2, 1)))) + Seq(Array(1, 2)).toDF("id").createOrReplaceTempView("t") + val arrays1 = sql("select * from t where id in (array(1, 2), array(2, 1))") + val arrays2 = sql("select * from t where id in (array(2, 1), array(1, 2))") + val arrays3 = sql("select * from t where id in (array(1, 2), array(3, 1))") assert(arrays1.queryExecution.executedPlan.sameResult(arrays2.queryExecution.executedPlan)) assert(!arrays1.queryExecution.executedPlan.sameResult(arrays3.queryExecution.executedPlan)) val arraysHash1 = arrays1.queryExecution.logical.canonicalized.semanticHash() From 0c484f16a7bcb0f4b476decdf6820c4d89d4cfb0 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 22 May 2018 22:22:14 +0200 Subject: [PATCH 4/6] remove useless import --- .../apache/spark/sql/catalyst/expressions/Canonicalize.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index 7549d0de407b3..1b205f7dd31f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.util.TypeUtils - /** * Rewrites an expression using rules that are guaranteed preserve the result while attempting * to remove cosmetic variations. Deterministic expressions that are `equal` after canonicalization From a0af52524e30a9ace9d9a6239de79a7251a2499c Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 24 May 2018 11:17:51 +0200 Subject: [PATCH 5/6] address comments --- .../catalyst/expressions/Canonicalize.scala | 4 +- .../expressions/CanonicalizeSuite.scala | 53 +++++++++++++++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 28 ---------- 3 files changed, 55 insertions(+), 30 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index 1b205f7dd31f2..966090069de4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -30,6 +30,7 @@ package org.apache.spark.sql.catalyst.expressions * by `hashCode`. * - [[EqualTo]] and [[EqualNullSafe]] are reordered by `hashCode`. * - Other comparisons ([[GreaterThan]], [[LessThan]]) are reversed by `hashCode`. + * - Elements in [[In]] are reordered by `hashCode`. */ object Canonicalize { def execute(e: Expression): Expression = { @@ -86,8 +87,7 @@ object Canonicalize { case Not(LessThanOrEqual(l, r)) => GreaterThan(l, r) // order the list in the In operator - case In(value, list) => - In(value, list.sortBy(_.semanticHash())) + case In(value, list) => In(value, list.sortBy(_.hashCode())) case _ => e } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala new file mode 100644 index 0000000000000..28e6940f3cca3 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.logical.Range + +class CanonicalizeSuite extends SparkFunSuite { + + test("SPARK-24276: IN expression with different order are semantically equal") { + val range = Range(1, 1, 1, 1) + val idAttr = range.output.head + + val in1 = In(idAttr, Seq(Literal(1), Literal(2))) + val in2 = In(idAttr, Seq(Literal(2), Literal(1))) + val in3 = In(idAttr, Seq(Literal(1), Literal(2), Literal(3))) + + assert(in1.canonicalized.semanticHash() == in2.canonicalized.semanticHash()) + assert(in1.canonicalized.semanticHash() != in3.canonicalized.semanticHash()) + + assert(range.where(in1).sameResult(range.where(in2))) + assert(!range.where(in1).sameResult(range.where(in3))) + + val arrays1 = In(idAttr, Seq(CreateArray(Seq(Literal(1), Literal(2))), + CreateArray(Seq(Literal(2), Literal(1))))) + val arrays2 = In(idAttr, Seq(CreateArray(Seq(Literal(2), Literal(1))), + CreateArray(Seq(Literal(1), Literal(2))))) + val arrays3 = In(idAttr, Seq(CreateArray(Seq(Literal(1), Literal(2))), + CreateArray(Seq(Literal(3), Literal(1))))) + + assert(arrays1.canonicalized.semanticHash() == arrays2.canonicalized.semanticHash()) + assert(arrays1.canonicalized.semanticHash() != arrays3.canonicalized.semanticHash()) + + assert(range.where(arrays1).sameResult(range.where(arrays2))) + assert(!range.where(arrays1).sameResult(range.where(arrays3))) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 8d075389f3cb3..1cc8cb3874c9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2266,34 +2266,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(df, df.collect()) } - test("SPARK-24276: IN returns sameResult if the order of literals is different") { - val df = spark.range(1) - val p1 = df.where($"id".isin(1, 2)) - val p2 = df.where($"id".isin(2, 1)) - val p3 = df.where($"id".isin(1, 2, 3)) - - assert(p1.queryExecution.executedPlan.sameResult(p2.queryExecution.executedPlan)) - assert(!p1.queryExecution.executedPlan.sameResult(p3.queryExecution.executedPlan)) - - val h1 = p1.queryExecution.logical.canonicalized.semanticHash() - val h2 = p2.queryExecution.logical.canonicalized.semanticHash() - val h3 = p3.queryExecution.logical.canonicalized.semanticHash() - assert(h1 == h2) - assert(h1 != h3) - - Seq(Array(1, 2)).toDF("id").createOrReplaceTempView("t") - val arrays1 = sql("select * from t where id in (array(1, 2), array(2, 1))") - val arrays2 = sql("select * from t where id in (array(2, 1), array(1, 2))") - val arrays3 = sql("select * from t where id in (array(1, 2), array(3, 1))") - assert(arrays1.queryExecution.executedPlan.sameResult(arrays2.queryExecution.executedPlan)) - assert(!arrays1.queryExecution.executedPlan.sameResult(arrays3.queryExecution.executedPlan)) - val arraysHash1 = arrays1.queryExecution.logical.canonicalized.semanticHash() - val arraysHash2 = arrays2.queryExecution.logical.canonicalized.semanticHash() - val arraysHash3 = arrays3.queryExecution.logical.canonicalized.semanticHash() - assert(arraysHash1 == arraysHash2) - assert(arraysHash1 != arraysHash3) - } - test("SPARK-24313: access map with binary keys") { val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1)) checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1)) From 5c88d869cd4750d9c5e02789a075a29d96357eaa Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 25 May 2018 10:34:27 +0200 Subject: [PATCH 6/6] address comment --- .../apache/spark/sql/catalyst/expressions/Canonicalize.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index 966090069de4e..7541f527a52a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -87,7 +87,9 @@ object Canonicalize { case Not(LessThanOrEqual(l, r)) => GreaterThan(l, r) // order the list in the In operator - case In(value, list) => In(value, list.sortBy(_.hashCode())) + // In subqueries contain only one element of type ListQuery. So checking that the length > 1 + // we are not reordering In subqueries. + case In(value, list) if list.length > 1 => In(value, list.sortBy(_.hashCode())) case _ => e }