Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ case class Limit(limit: Int, child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition

override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def executeCollect(): Array[InternalRow] = child.executeTake(limit)

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -200,18 +204,31 @@ case class TakeOrderedAndProject(
projectOutput.getOrElse(child.output)
}

override def outputsUnsafeRows: Boolean = if (projectList.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be easy that just process UnsafeRow and output UnsafeRow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I need to close this again.....

true
} else {
child.outputsUnsafeRows
}

override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def outputPartitioning: Partitioning = SinglePartition

// We need to use an interpreted ordering here because generated orderings cannot be serialized
// and this ordering needs to be created on the driver in order to be passed into Spark core code.
private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)

// TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
@transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
@transient private val projection = projectList.map(UnsafeProjection.create(_, child.output))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InterpretedProjection can be replaced by UnsafeProjection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I still have a dumb question. When calling the eval of each of the specified expressions, how can we know they can process unsafe rows? Why does the planner insert unsafe->safe conversion in the original design of TakeOrderedAndProject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is just because not all expressions support unsafe before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now all the expressions can support unsafe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. If there are expressions still not supporting unsafe, we should make it support.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you!


private def collectData(): Array[InternalRow] = {
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
projection.map(data.map(_)).getOrElse(data)
if (projection.isDefined) {
projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need copy here? We have already copied the rows when getting data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I was thinking that it is needed to copy the returned row because it is the same object. But after I checked GenerateUnsafeProjection, looks like it will create new row every time. I've updated it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it will be a problem without this copy(). HiveCompatibilitySuite will be failed.

[info]   key    value
[info]   !== HIVE - 5 row(s) ==   == CATALYST - 5 row(s) ==
[info]   !0 val_0                 4 val_4
[info]   !0 val_0                 4 val_4
[info]   !0 val_0                 4 val_4
[info]   !2 val_2                 4 val_4
[info]    4 val_4                 4 val_4

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-checked GenerateUnsafeProjection, it will return the same unsafe row. So we should use another copy() here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry I misread the code, the copy is needed even we already copied before takeOrdered.

} else {
data
}
}

override def executeCollect(): Array[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{ArrayType, StringType}
import org.apache.spark.unsafe.types.UTF8String

case class DummySafeNode(limit: Int, child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def canProcessUnsafeRows: Boolean = false
override def canProcessSafeRows: Boolean = true

override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
protected override def doExecute(): RDD[InternalRow] = child.execute()
}

class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {

private def getConverters(plan: SparkPlan): Seq[SparkPlan] = plan.collect {
Expand All @@ -39,7 +48,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
assert(outputsUnsafe.outputsUnsafeRows)

test("planner should insert unsafe->safe conversions when required") {
val plan = Limit(10, outputsUnsafe)
val plan = DummySafeNode(10, outputsUnsafe)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe])
}
Expand Down