Skip to content

Commit 219f200

Browse files
committed
follow comment
1 parent 270324e commit 219f200

File tree

3 files changed

+85
-106
lines changed

3 files changed

+85
-106
lines changed

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala

Lines changed: 5 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution
1919

2020
import org.scalatest.Matchers._
2121

22-
import org.apache.spark.sql.QueryTest
2322
import org.apache.spark.sql.catalyst.TableIdentifier
2423
import org.apache.spark.sql.catalyst.dsl.expressions._
2524
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -30,12 +29,12 @@ import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRel
3029
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
3130
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
3231
import org.apache.spark.sql.functions.broadcast
33-
import org.apache.spark.sql.hive.test.TestHiveSingleton
3432
import org.apache.spark.sql.internal.SQLConf
35-
import org.apache.spark.sql.test.SQLTestUtils
3633
import org.apache.spark.sql.types.StructType
3734

38-
class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
35+
class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {
36+
37+
convert = "true"
3938

4039
object Optimize extends RuleExecutor[LogicalPlan] {
4140
val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) :: Nil
@@ -110,53 +109,9 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
110109
}
111110
}
112111

113-
test("SPARK-28169: Convert scan predicate condition to CNF") {
114-
withTable("t", "temp") {
115-
sql(
116-
s"""
117-
|CREATE TABLE t(i int, p string)
118-
|USING PARQUET
119-
|PARTITIONED BY (p)
120-
|""".stripMargin)
121-
spark.range(0, 1000, 1).selectExpr("id as col")
122-
.createOrReplaceTempView("temp")
123-
124-
for (part <- Seq(1, 2, 3, 4)) {
125-
sql(
126-
s"""
127-
|INSERT OVERWRITE TABLE t PARTITION (p='$part')
128-
|select col from temp""".stripMargin)
129-
}
130-
131-
assertPrunedPartitions(
132-
"SELECT * FROM t WHERE p = '1' OR (p = '2' AND i = 1)", 2)
133-
assertPrunedPartitions(
134-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (i = 1 or p = '2')", 4)
135-
assertPrunedPartitions(
136-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (p = '3' and i = 3 )", 2)
137-
assertPrunedPartitions(
138-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (p = '2' or p = '3')", 3)
139-
assertPrunedPartitions(
140-
"SELECT * FROM t", 4)
141-
assertPrunedPartitions(
142-
"SELECT * FROM t where p = '1' and i = 2", 1)
143-
assertPrunedPartitions(
144-
"""
145-
|SELECT i, COUNT(1) FROM (
146-
|SELECT * FROM t where p = '1' OR (p = '2' AND i = 1)
147-
|) TMP GROUP BY i
148-
""".stripMargin, 2)
149-
}
150-
}
151-
152-
private def assertPrunedPartitions(query: String, expected: Long): Unit = {
153-
val prunedPartitions = getFileScanExec(query).relation.location.inputFiles.length
154-
assert(prunedPartitions == expected)
155-
}
156-
157-
private def getFileScanExec(query: String): FileSourceScanExec = {
112+
override def getScanExecPartitionSize(query: String): Long = {
158113
sql(query).queryExecution.sparkPlan.collectFirst {
159114
case p: FileSourceScanExec => p
160-
}.get
115+
}.get.relation.location.inputFiles.length
161116
}
162117
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala

Lines changed: 5 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.spark.sql.hive.execution
1919

20-
import org.apache.spark.sql.QueryTest
2120
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
2221
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2322
import org.apache.spark.sql.catalyst.rules.RuleExecutor
24-
import org.apache.spark.sql.hive.test.TestHiveSingleton
25-
import org.apache.spark.sql.test.SQLTestUtils
2623

27-
class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
24+
class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase {
25+
26+
convert = "false"
2827

2928
object Optimize extends RuleExecutor[LogicalPlan] {
3029
val batches =
@@ -55,59 +54,9 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes
5554
}
5655
}
5756

58-
test("SPARK-28169: Convert scan predicate condition to CNF") {
59-
withTable("t", "temp") {
60-
sql(
61-
s"""
62-
|CREATE TABLE t(i int)
63-
|PARTITIONED BY (p int)
64-
|STORED AS textfile""".stripMargin)
65-
spark.range(0, 1000, 1).selectExpr("id as col")
66-
.createOrReplaceTempView("temp")
67-
68-
for (part <- Seq(1, 2, 3, 4)) {
69-
sql(
70-
s"""
71-
|INSERT OVERWRITE TABLE t PARTITION (p='$part')
72-
|select col from temp""".stripMargin)
73-
}
74-
75-
assertPrunedPartitions(
76-
"SELECT * FROM t WHERE p = '1' OR (p = '2' AND i = 1)",
77-
Array("t(p=1)", "t(p=2)"))
78-
assertPrunedPartitions(
79-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (i = 1 or p = '2')",
80-
Array("t(p=1)", "t(p=2)", "t(p=3)", "t(p=4)"))
81-
assertPrunedPartitions(
82-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (p = '3' and i = 3 )",
83-
Array("t(p=1)", "t(p=3)"))
84-
assertPrunedPartitions(
85-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (p = '2' or p = '3')",
86-
Array("t(p=1)", "t(p=2)", "t(p=3)"))
87-
assertPrunedPartitions(
88-
"SELECT * FROM t",
89-
Array("t(p=1)", "t(p=2)", "t(p=3)", "t(p=4)"))
90-
assertPrunedPartitions(
91-
"SELECT * FROM t where p = '1' and i = 2",
92-
Array("t(p=1)"))
93-
assertPrunedPartitions(
94-
"""
95-
|SELECT i, COUNT(1) FROM (
96-
|SELECT * FROM t where p = '1' OR (p = '2' AND i = 1)
97-
|) TMP GROUP BY i
98-
""".stripMargin,
99-
Array("t(p=1)", "t(p=2)"))
100-
}
101-
}
102-
103-
private def assertPrunedPartitions(query: String, expected: Array[String]): Unit = {
104-
val prunedPartitions = getHiveTableScanExec(query).prunedPartitions.map(_.toString).toArray
105-
assert(prunedPartitions.sameElements(expected))
106-
}
107-
108-
private def getHiveTableScanExec(query: String): HiveTableScanExec = {
57+
override def getScanExecPartitionSize(query: String): Long = {
10958
sql(query).queryExecution.sparkPlan.collectFirst {
11059
case p: HiveTableScanExec => p
111-
}.get
60+
}.get.prunedPartitions.size
11261
}
11362
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.execution
19+
20+
import org.apache.spark.sql.QueryTest
21+
import org.apache.spark.sql.hive.HiveUtils
22+
import org.apache.spark.sql.hive.test.TestHiveSingleton
23+
import org.apache.spark.sql.test.SQLTestUtils
24+
25+
abstract class PrunePartitionSuiteBase extends QueryTest with SQLTestUtils with TestHiveSingleton {
26+
27+
var convert: String = _
28+
29+
test("SPARK-28169: Convert scan predicate condition to CNF") {
30+
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convert,
31+
HiveUtils.CONVERT_METASTORE_ORC.key -> convert) {
32+
withTable("t", "temp") {
33+
sql(
34+
s"""
35+
|CREATE TABLE t(i int)
36+
|PARTITIONED BY (p int)
37+
|STORED AS PARQUET""".stripMargin)
38+
spark.range(0, 1000, 1).selectExpr("id as col")
39+
.createOrReplaceTempView("temp")
40+
41+
for (part <- Seq(1, 2, 3, 4)) {
42+
sql(
43+
s"""
44+
|INSERT OVERWRITE TABLE t PARTITION (p='$part')
45+
|select col from temp""".stripMargin)
46+
}
47+
48+
assertPrunedPartitions(
49+
"SELECT * FROM t WHERE p = '1' OR (p = '2' AND i = 1)", 2)
50+
assertPrunedPartitions(
51+
"SELECT * FROM t WHERE (p = '1' and i = 2) or (i = 1 or p = '2')", 4)
52+
assertPrunedPartitions(
53+
"SELECT * FROM t WHERE (p = '1' and i = 2) or (p = '3' and i = 3 )", 2)
54+
assertPrunedPartitions(
55+
"SELECT * FROM t WHERE (p = '1' and i = 2) or (p = '2' or p = '3')", 3)
56+
assertPrunedPartitions(
57+
"SELECT * FROM t", 4)
58+
assertPrunedPartitions(
59+
"SELECT * FROM t where p = '1' and i = 2", 1)
60+
assertPrunedPartitions(
61+
"""
62+
|SELECT i, COUNT(1) FROM (
63+
|SELECT * FROM t where p = '1' OR (p = '2' AND i = 1)
64+
|) TMP GROUP BY i
65+
""".stripMargin, 2)
66+
}
67+
}
68+
}
69+
70+
protected def assertPrunedPartitions(query: String, expected: Long): Unit = {
71+
assert(getScanExecPartitionSize(query) == expected)
72+
}
73+
74+
protected def getScanExecPartitionSize(query: String): Long
75+
}

0 commit comments

Comments
 (0)