Skip to content

Commit 77106df

Browse files
Yanbo Liangmarmbrus
authored andcommitted
SPARK-4963 [SQL] Add copy to SQL's Sample operator
https://issues.apache.org/jira/browse/SPARK-4963 SchemaRDD.sample() return wrong results due to GapSamplingIterator operating on mutable row. HiveTableScan make RDD with SpecificMutableRow and SchemaRDD.sample() will return GapSamplingIterator for iterating. override def next(): T = { val r = data.next() advance r } GapSamplingIterator.next() return the current underlying element and assigned it to r. However if the underlying iterator is mutable row just like what HiveTableScan returned, underlying iterator and r will point to the same object. After advance operation, we drop some underlying elments and it also changed r which is not expected. Then we return the wrong value different from initial r. To fix this issue, the most direct way is to make HiveTableScan return mutable row with copy just like the initial commit that I have made. This solution will make HiveTableScan can not get the full advantage of reusable MutableRow, but it can make sample operation return correct result. Further more, we need to investigate GapSamplingIterator.next() and make it can implement copy operation inside it. To achieve this, we should define every elements that RDD can store implement the function like cloneable and it will make huge change. Author: Yanbo Liang <[email protected]> Closes #3827 from yanbohappy/spark-4963 and squashes the following commits: 0912ca0 [Yanbo Liang] code format keep 65c4e7c [Yanbo Liang] import file and clear annotation 55c7c56 [Yanbo Liang] better output of test case cea7e2e [Yanbo Liang] SchemaRDD add copy operation before Sample operator e840829 [Yanbo Liang] HiveTableScan return mutable row with copy
1 parent b3e86dc commit 77106df

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child:
7070
override def output = child.output
7171

7272
// TODO: How to pick seed?
73-
override def execute() = child.execute().sample(withReplacement, fraction, seed)
73+
override def execute() = child.execute().map(_.copy()).sample(withReplacement, fraction, seed)
7474
}
7575

7676
/**

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.QueryTest
2121

2222
import org.apache.spark.sql.Row
2323
import org.apache.spark.sql.hive.test.TestHive._
24+
import org.apache.spark.util.Utils
2425

2526
case class Nested1(f1: Nested2)
2627
case class Nested2(f2: Nested3)
@@ -202,4 +203,15 @@ class SQLQuerySuite extends QueryTest {
202203
checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
203204
sql("SELECT distinct key FROM src order by key").collect().toSeq)
204205
}
206+
207+
test("SPARK-4963 SchemaRDD sample on mutable row return wrong result") {
208+
sql("SELECT * FROM src WHERE key % 2 = 0")
209+
.sample(withReplacement = false, fraction = 0.3)
210+
.registerTempTable("sampled")
211+
(1 to 10).foreach { i =>
212+
checkAnswer(
213+
sql("SELECT * FROM sampled WHERE key % 2 = 1"),
214+
Seq.empty[Row])
215+
}
216+
}
205217
}

0 commit comments

Comments
 (0)