Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
579b5a2
Add equivalentConditions()
codingjaguar Dec 5, 2015
0a09698
set comparison for projection
windscope Dec 5, 2015
85f1ebc
Fix set conversion bug
windscope Dec 5, 2015
1a2b534
Remove set comparison of projection
windscope Dec 5, 2015
5fcb85c
Add test case for filter condition order
windscope Dec 5, 2015
8f93c6a
Fix style error
windscope Dec 5, 2015
6eb6fdd
add testcase for SameResultSuite
windscope Dec 6, 2015
02fc878
add testcase for OR split filter condition
windscope Dec 6, 2015
bcb6df0
Supported expressions with disjunctive predicates;
codingjaguar Dec 6, 2015
360bb2b
Merge branch 'jiang.filter-set' of github.com:codingjaguar/spark into…
windscope Dec 6, 2015
94837d6
Merge branch 'jiang.filter-set'
codingjaguar Dec 6, 2015
0de3d7e
Merge branch 'jiang.filter-set' of github.com:codingjaguar/spark into…
windscope Dec 6, 2015
13ce03f
Removed dead code
codingjaguar Dec 6, 2015
9f6df41
Merge branch 'jiang.filter-set' of github.com:codingjaguar/spark into…
codingjaguar Dec 6, 2015
e63df88
Merge branch 'jiang.filter-set'
codingjaguar Dec 6, 2015
9043afd
Refactor change. Move equivalentConditions to PredicateHelper
codingjaguar Dec 7, 2015
da46b1c
Merge branch 'jiang.filter-set'
codingjaguar Dec 7, 2015
ba29f0b
Fix style issue
windscope Dec 8, 2015
c81aa46
Merge branch 'jiang.filter-set'
windscope Dec 8, 2015
cc8fdfe
Merge branch 'master' of https://github.com/apache/spark
codingjaguar Dec 8, 2015
b8ba919
Integrate the functionality of equivalentPredicate with semanticEquals
codingjaguar Dec 9, 2015
a0e8c4a
Refactor sameResult to utilize semanticEquals
codingjaguar Dec 9, 2015
07128b3
Merge branch 'jiang.filter-set'
codingjaguar Dec 9, 2015
4af3622
Rewrite semanticEquals in And and Or to ignore their ordering.
codingjaguar Dec 9, 2015
99626a4
Rewrite the code block that compares the equivalency of
codingjaguar Dec 9, 2015
2efca2f
Fix a bug in semanticEqual. Add some comment.
codingjaguar Dec 9, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* the same output data type.
*
*/
abstract class Expression extends TreeNode[Expression] {
abstract class Expression extends TreeNode[Expression]{

/**
* Returns true when an expression is a candidate for static evaluation before the query is
Expand Down Expand Up @@ -139,26 +139,39 @@ abstract class Expression extends TreeNode[Expression] {
*/
def childrenResolved: Boolean = children.forall(_.resolved)

def checkSemantic(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
elements1.length == elements2.length && elements1.zip(elements2).forall {
case (e1: Expression, e2: Expression) => e1 semanticEquals e2
case (Some(e1: Expression), Some(e2: Expression)) => e1 semanticEquals e2
case (t1: Traversable[_], t2: Traversable[_]) => checkSemantic(t1.toSeq, t2.toSeq)
case (i1, i2) => i1 == i2
}
}

/**
* Returns true when two expressions will always compute the same result, even if they differ
* cosmetically (i.e. capitalization of names in attributes may be different).
*/
def semanticEquals(other: Expression): Boolean = this.getClass == other.getClass && {
def checkSemantic(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
elements1.length == elements2.length && elements1.zip(elements2).forall {
case (e1: Expression, e2: Expression) => e1 semanticEquals e2
case (Some(e1: Expression), Some(e2: Expression)) => e1 semanticEquals e2
case (t1: Traversable[_], t2: Traversable[_]) => checkSemantic(t1.toSeq, t2.toSeq)
case (i1, i2) => i1 == i2
}
}
// Non-deterministic expressions cannot be semantic equal
if (!deterministic || !other.deterministic) return false
val elements1 = this.productIterator.toSeq
val elements2 = other.asInstanceOf[Product].productIterator.toSeq
checkSemantic(elements1, elements2)
}

/**
* Returns a sequence of expressions by removing from q the first expression that is semantically
* equivalent to e. If such an expression was not found, return seq.
*/
def removeFirstSemanticEquivalent(seq: Seq[Expression], e: Expression): Seq[Expression] = {
seq match {
case Seq() => Seq()
case x +: rest if x semanticEquals e => rest
case x +: rest => x +: removeFirstSemanticEquivalent(rest, e)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't clarify it clearly. I mean we can override semanticEquals in concrete expressions like Or, And, etc. And we don't need to support all commutative operators at once, you can only finish the predicates parts in this PR and open follow-up PRs for other parts(like Add, Multiply). Let's do it step-by-step :)

}

/**
* Returns the hash for this expression. Expressions that compute the same result, even if
* they differ cosmetically should return the same hash.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with
}
}

case class And(left: Expression, right: Expression) extends BinaryOperator with Predicate {
case class And(left: Expression, right: Expression) extends BinaryOperator
with Predicate with PredicateHelper{

override def inputType: AbstractDataType = BooleanType

Expand All @@ -252,6 +253,27 @@ case class And(left: Expression, right: Expression) extends BinaryOperator with
}
}

override def semanticEquals(other: Expression): Boolean = this.getClass == other.getClass && {
// Non-deterministic expressions cannot be semantic equal
if (!deterministic || !other.deterministic) return false

// We already know both expressions are And, so we can tolerate ordering different
// Recursively call semanticEquals on subexpressions to check the equivalency of two seqs.
var elements1 = splitConjunctivePredicates(this)
val elements2 = splitConjunctivePredicates(other)
// We can recursively call semanticEquals to check the equivalency for subexpressions, but
// there is no simple solution to compare the equivalency of sequence of expressions.
// Expression class doesn't have order, so we couldn't sort them. We can neither use
// set comparison as Set doesn't support custom compare function, which is semanticEquals.
// To check the equivalency of elements1 and elements2, we first compare their size. Then
// for each element in elements2, we remove its first semantically equivalent expression from
// elements1. If they are semantically equivalent, elements1 should be empty at the end.
elements1.size == elements2.size && {
for (e <- elements2) elements1 = removeFirstSemanticEquivalent(elements1, e)
elements1.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may missed something here, can we just write:

override def semanticEquals(other: Expression): Boolean = other match {
  case And(otherLeft, otherRight) =>
    (left.semanticEquals(otherLeft) && right.semanticEquals(otherRight)) ||
    (left.semanticEquals(otherRight) && right.semanticEquals(otherLeft))
  case _ => false
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider this example
e1 = And(a, And(b, c))
e2 = And(And(a,b), c))
They are semantically equivalent, but will return false in your code.
splitConjunctivePredicates will crunch the expression tree into a sequence of (a, b, c).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, this makes sense.
But I think a better way is to add an optimization rule to turn all predicates into CNF, before we begin to check the semantic, or it will be hard to cover all cases like a || (b && c) == (a || b) && (a || c)

cc @liancheng

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there is an open PR that implements CNF normalization. Is there any reason why it hasn't been merged?
#8200

cc @yjshen

}
}

override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val eval1 = left.gen(ctx)
val eval2 = right.gen(ctx)
Expand All @@ -277,7 +299,8 @@ case class And(left: Expression, right: Expression) extends BinaryOperator with
}


case class Or(left: Expression, right: Expression) extends BinaryOperator with Predicate {
case class Or(left: Expression, right: Expression) extends BinaryOperator
with Predicate with PredicateHelper {

override def inputType: AbstractDataType = BooleanType

Expand All @@ -301,6 +324,26 @@ case class Or(left: Expression, right: Expression) extends BinaryOperator with P
}
}

override def semanticEquals(other: Expression): Boolean = this.getClass == other.getClass && {
// Non-deterministic expressions cannot be semantic equal
if (!deterministic || !other.deterministic) return false

// We know both expressions are Or, so we can tolerate ordering different
var elements1 = splitDisjunctivePredicates(this)
val elements2 = splitDisjunctivePredicates(other)
// We can recursively call semanticEquals to check the equivalency for subexpressions, but
// there is no simple solution to compare the equivalency of sequence of expressions.
// Expression class doesn't have order, so we couldn't sort them. We can neither use
// set comparison as Set doesn't support custom compare function, which is semanticEquals.
// To check the equivalency of elements1 and elements2, we first compare their size. Then
// for each element in elements2, we remove its first semantically equivalent expression from
// elements1. If they are semantically equivalent, elements1 should be empty at the end.
elements1.size == elements2.size && {
for (e <- elements2) elements1 = removeFirstSemanticEquivalent(elements1, e)
elements1.isEmpty
}
}

override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val eval1 = left.gen(ctx)
val eval2 = right.gen(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,33 +127,38 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
cleanLeft.children.size == cleanRight.children.size && {
logDebug(
s"[${cleanRight.cleanArgs.mkString(", ")}] == [${cleanLeft.cleanArgs.mkString(", ")}]")
cleanRight.cleanArgs == cleanLeft.cleanArgs
} &&
(cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
cleanLeft.cleanArgs.zip(cleanRight.cleanArgs).forall {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we just change this to:

cleanRight.zip(cleanArgs).forall {
  case (e1: Expression, e2: Expression) => e1 semanticEquals e2 
  caes (a1, a2) => a1 == a2
}

then we can just improve Expression.sentaicEquals

case (e1: Expression, e2: Expression) => e1 semanticEquals e2
case (a1, a2) => a1 == a2
}
} && (cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
}

/** Clean an expression so that differences in expression id should not affect equality */
def cleanExpression(e: Expression, input: Seq[Attribute]): Expression = e match {
case a: Alias =>
// As the root of the expression, Alias will always take an arbitrary exprId, we need
// to erase that for equality testing.
val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers)
BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
case other => BindReferences.bindReference(other, input, allowFailures = true)
}


/** Args that have cleaned such that differences in expression id should not affect equality */
protected lazy val cleanArgs: Seq[Any] = {
val input = children.flatMap(_.output)
def cleanExpression(e: Expression) = e match {
case a: Alias =>
// As the root of the expression, Alias will always take an arbitrary exprId, we need
// to erase that for equality testing.
val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers)
BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
case other => BindReferences.bindReference(other, input, allowFailures = true)
}

productIterator.map {
// Children are checked using sameResult above.
case tn: TreeNode[_] if containsChild(tn) => null
case e: Expression => cleanExpression(e)
case e: Expression => cleanExpression(e, input)
case s: Option[_] => s.map {
case e: Expression => cleanExpression(e)
case e: Expression => cleanExpression(e, input)
case other => other
}
case s: Seq[_] => s.map {
case e: Expression => cleanExpression(e)
case e: Expression => cleanExpression(e, input)
case other => other
}
case other => other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.sql.catalyst.util._
* Tests for the sameResult function of [[LogicalPlan]].
*/
class SameResultSuite extends SparkFunSuite {
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation2 = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.int)
val testRelation2 = LocalRelation('a.int, 'b.int, 'c.int, 'd.int)

def assertSameResult(a: LogicalPlan, b: LogicalPlan, result: Boolean = true): Unit = {
val aAnalyzed = a.analyze
Expand Down Expand Up @@ -57,6 +57,24 @@ class SameResultSuite extends SparkFunSuite {

test("filters") {
assertSameResult(testRelation.where('a === 'b), testRelation2.where('a === 'b))
assertSameResult(testRelation.where('a === 'b && 'c === 'd),
testRelation2.where('c === 'd && 'a === 'b )
)
assertSameResult(testRelation.where('a === 'b || 'c === 'd),
testRelation2.where('c === 'd || 'a === 'b )
)
assertSameResult(testRelation.where(('a === 'b || 'c === 'd) && ('e === 'f || 'g === 'h)),
testRelation2.where(('g === 'h || 'e === 'f) && ('c === 'd || 'a === 'b ))
)

assertSameResult(testRelation.where('a === 'b && 'c === 'd),
testRelation2.where('a === 'c && 'b === 'd),
result = false
)
assertSameResult(testRelation.where('a === 'b || 'c === 'd),
testRelation2.where('a === 'c || 'b === 'd),
result = false
)
}

test("sorts") {
Expand Down