Skip to content

Commit ade6f5d

Browse files
author
Davies Liu
committed
bug fix
1 parent a38d623 commit ade6f5d

File tree

2 files changed

+26
-18
lines changed

2 files changed

+26
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ trait HashJoin {
5353
protected def streamSideKeyGenerator: Projection =
5454
UnsafeProjection.create(streamedKeys, streamedPlan.output)
5555

56-
@transient private[this] lazy val boundCondition =
56+
@transient private[this] lazy val boundCondition = if (condition.isDefined) {
5757
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
58+
} else {
59+
(r: InternalRow) => true
60+
}
5861

5962
protected def hashJoin(
6063
streamIter: Iterator[InternalRow],
@@ -74,28 +77,28 @@ trait HashJoin {
7477

7578
private[this] val joinKeys = streamSideKeyGenerator
7679

77-
hasNext // find the initial match
78-
7980
override final def hasNext: Boolean = {
80-
while (currentMatchPosition >= 0) {
81-
81+
while (true) {
8282
// check if it's end of current matches
83-
if (currentMatchPosition == currentHashMatches.length) {
83+
if (currentHashMatches != null && currentMatchPosition == currentHashMatches.length) {
8484
currentHashMatches = null
8585
currentMatchPosition = -1
86+
}
8687

87-
while (currentHashMatches == null && streamIter.hasNext) {
88-
currentStreamedRow = streamIter.next()
89-
numStreamRows += 1
90-
val key = joinKeys(currentStreamedRow)
91-
if (!key.anyNull) {
92-
currentHashMatches = hashedRelation.get(key)
88+
// find the next match
89+
while (currentHashMatches == null && streamIter.hasNext) {
90+
currentStreamedRow = streamIter.next()
91+
numStreamRows += 1
92+
val key = joinKeys(currentStreamedRow)
93+
if (!key.anyNull) {
94+
currentHashMatches = hashedRelation.get(key)
95+
if (currentHashMatches != null) {
96+
currentMatchPosition = 0
9397
}
9498
}
95-
if (currentHashMatches == null) {
96-
return false
97-
}
98-
currentMatchPosition = 0
99+
}
100+
if (currentHashMatches == null) {
101+
return false
99102
}
100103

101104
// found some matches
@@ -105,9 +108,11 @@ trait HashJoin {
105108
}
106109
if (boundCondition(joinRow)) {
107110
return true
111+
} else {
112+
currentMatchPosition += 1
108113
}
109114
}
110-
false
115+
false // unreachable
111116
}
112117

113118
override final def next(): InternalRow = {

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,11 @@ trait HashOuterJoin {
7878

7979
@transient private[this] lazy val leftNullRow = new GenericInternalRow(left.output.length)
8080
@transient private[this] lazy val rightNullRow = new GenericInternalRow(right.output.length)
81-
@transient private[this] lazy val boundCondition =
81+
@transient private[this] lazy val boundCondition = if (condition.isDefined) {
8282
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
83+
} else {
84+
(row: InternalRow) => true
85+
}
8386

8487
// TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
8588
// iterator for performance purpose.

0 commit comments

Comments
 (0)