Skip to content

Commit f21cf43

Browse files
committed
follow comment
1 parent 219f200 commit f21cf43

File tree

4 files changed

+36
-16
lines changed

4 files changed

+36
-16
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,32 @@ trait PredicateHelper extends Logging {
251251
resultStack.top
252252
}
253253

254+
/**
255+
* Convert an expression to conjunctive normal form when pushing predicates through Join,
256+
* when expand predicates, we can group by the qualifier avoiding generate unnecessary
257+
* expression to control the length of final result since there are multiple tables.
258+
* @param condition condition need to be convert
259+
* @return expression seq in conjunctive normal form of input expression, if length exceeds
260+
* the threshold [[SQLConf.MAX_CNF_NODE_COUNT]] or length != 1, return empty Seq
261+
*/
254262
def conjunctiveNormalFormAndGroupExpsByQualifier(condition: Expression): Seq[Expression] = {
255263
conjunctiveNormalForm(condition,
256264
(expressions: Seq[Expression]) =>
257265
expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq)
258266
}
259267

268+
/**
269+
* Convert an expression to conjunctive normal form when pushing predicates for partition pruning,
270+
* when expand predicates, we can group by the reference avoiding generate unnecessary expression
271+
* to control the length of final result since here we just have one table. In partition pruning
272+
* strategies, we split filters by [[splitConjunctivePredicates]] and partition filters by judging
273+
* if it's references is subset of partCols, if we combine expressions group by reference when
274+
* expand predicate of [[Or]], it won't impact final predicate pruning result since
275+
* [[splitConjunctivePredicates]] won't split [[Or]] expression.
276+
* @param condition condition need to be convert
277+
* @return expression seq in conjunctive normal form of input expression, if length exceeds
278+
* the threshold [[SQLConf.MAX_CNF_NODE_COUNT]] or length != 1, return empty Seq
279+
*/
260280
def conjunctiveNormalFormAndGroupExpsByReference(condition: Expression): Seq[Expression] = {
261281
conjunctiveNormalForm(condition,
262282
(expressions: Seq[Expression]) =>

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StructType
3434

3535
class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {
3636

37-
convert = "true"
37+
override def format: String = "parquet"
3838

3939
object Optimize extends RuleExecutor[LogicalPlan] {
4040
val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) :: Nil

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
2323

2424
class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase {
2525

26-
convert = "false"
26+
override def format(): String = "hive"
2727

2828
object Optimize extends RuleExecutor[LogicalPlan] {
2929
val batches =

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,44 +24,44 @@ import org.apache.spark.sql.test.SQLTestUtils
2424

2525
abstract class PrunePartitionSuiteBase extends QueryTest with SQLTestUtils with TestHiveSingleton {
2626

27-
var convert: String = _
27+
protected def format: String
2828

2929
test("SPARK-28169: Convert scan predicate condition to CNF") {
30-
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convert,
31-
HiveUtils.CONVERT_METASTORE_ORC.key -> convert) {
32-
withTable("t", "temp") {
30+
withTempView("temp") {
31+
withTable("t") {
3332
sql(
3433
s"""
35-
|CREATE TABLE t(i int)
36-
|PARTITIONED BY (p int)
37-
|STORED AS PARQUET""".stripMargin)
34+
|CREATE TABLE t(i INT, p STRING)
35+
|USING $format
36+
|PARTITIONED BY (p)""".stripMargin)
37+
3838
spark.range(0, 1000, 1).selectExpr("id as col")
3939
.createOrReplaceTempView("temp")
4040

4141
for (part <- Seq(1, 2, 3, 4)) {
4242
sql(
4343
s"""
4444
|INSERT OVERWRITE TABLE t PARTITION (p='$part')
45-
|select col from temp""".stripMargin)
45+
|SELECT col FROM temp""".stripMargin)
4646
}
4747

4848
assertPrunedPartitions(
4949
"SELECT * FROM t WHERE p = '1' OR (p = '2' AND i = 1)", 2)
5050
assertPrunedPartitions(
51-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (i = 1 or p = '2')", 4)
51+
"SELECT * FROM t WHERE (p = '1' AND i = 2) OR (i = 1 OR p = '2')", 4)
5252
assertPrunedPartitions(
53-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (p = '3' and i = 3 )", 2)
53+
"SELECT * FROM t WHERE (p = '1' AND i = 2) OR (p = '3' AND i = 3 )", 2)
5454
assertPrunedPartitions(
55-
"SELECT * FROM t WHERE (p = '1' and i = 2) or (p = '2' or p = '3')", 3)
55+
"SELECT * FROM t WHERE (p = '1' AND i = 2) OR (p = '2' OR p = '3')", 3)
5656
assertPrunedPartitions(
5757
"SELECT * FROM t", 4)
5858
assertPrunedPartitions(
59-
"SELECT * FROM t where p = '1' and i = 2", 1)
59+
"SELECT * FROM t WHERE p = '1' AND i = 2", 1)
6060
assertPrunedPartitions(
6161
"""
6262
|SELECT i, COUNT(1) FROM (
63-
|SELECT * FROM t where p = '1' OR (p = '2' AND i = 1)
64-
|) TMP GROUP BY i
63+
|SELECT * FROM t WHERE p = '1' OR (p = '2' AND i = 1)
64+
|) tmp GROUP BY i
6565
""".stripMargin, 2)
6666
}
6767
}

0 commit comments

Comments
 (0)