Skip to content

Commit fdc0097

Browse files
committed
Add extra unsafe projection for the case projectList is None.
1 parent c343447 commit fdc0097

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,19 @@ case class TakeOrderedAndProject(
216216

217217
// TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
218218
@transient private val projection = projectList.map(UnsafeProjection.create(_, child.output))
219+
@transient private lazy val unsafeProjection =
220+
UnsafeProjection.create(child.output.map(_.dataType).toArray)
219221

220222
private def collectData(): Array[InternalRow] = {
221223
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
222224
if (projection.isDefined) {
223225
projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get
224226
} else {
225-
data
227+
if (child.outputsUnsafeRows) {
228+
data
229+
} else {
230+
data.map(unsafeProjection(_).copy().asInstanceOf[InternalRow])
231+
}
226232
}
227233
}
228234

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
571571
mapData.collect().take(1).map(Row.fromTuple).toSeq)
572572
}
573573

574+
test("sort and limit") {
575+
checkAnswer(
576+
sql("SELECT * FROM arrayData ORDER BY data[0] ASC LIMIT 1"),
577+
arrayData.collect().sortBy(_.data(0)).map(Row.fromTuple).take(1).toSeq)
578+
}
579+
574580
test("CTE feature") {
575581
checkAnswer(
576582
sql("with q1 as (select * from testData limit 10) select * from q1"),

0 commit comments

Comments
 (0)