Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
def collect(): Array[Row] = withNewExecutionId {
queryExecution.executedPlan.executeCollect()
queryExecution.executedPlan.executeCollectPublic()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ private[sql] case class LocalTableScan(

protected override def doExecute(): RDD[InternalRow] = rdd

override def executeCollect(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row]).toArray
override def executeCollect(): Array[InternalRow] = {
rows.toArray
}

override def executeTake(limit: Int): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row]).take(limit).toArray
override def executeTake(limit: Int): Array[InternalRow] = {
rows.take(limit).toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Runs this query returning the result as an array.
*/
def executeCollect(): Array[Row] = {
execute().mapPartitions { iter =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
iter.map(converter(_).asInstanceOf[Row])
}.collect()
def executeCollect(): Array[InternalRow] = {
execute().map(_.copy()).collect()
}

/**
* Runs this query returning the result as an array, using external Row format.
*/
def executeCollectPublic(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
executeCollect().map(converter(_).asInstanceOf[Row])
}

/**
* Runs this query returning the first `n` rows as an array.
*
* This is modeled after RDD.take but never runs any job locally on the driver.
*/
def executeTake(n: Int): Array[Row] = {
def executeTake(n: Int): Array[InternalRow] = {
if (n == 0) {
return new Array[Row](0)
return new Array[InternalRow](0)
}

val childRDD = execute().map(_.copy())
Expand Down Expand Up @@ -218,8 +223,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
partsScanned += numPartsToTry
}

val converter = CatalystTypeConverters.createToScalaConverter(schema)
buf.toArray.map(converter(_).asInstanceOf[Row])
buf.toArray
}

private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ case class Limit(limit: Int, child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition

override def executeCollect(): Array[Row] = child.executeTake(limit)
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)

protected override def doExecute(): RDD[InternalRow] = {
val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) {
Expand Down Expand Up @@ -258,9 +258,8 @@ case class TakeOrderedAndProject(
projection.map(data.map(_)).getOrElse(data)
}

override def executeCollect(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
collectData().map(converter(_).asInstanceOf[Row])
override def executeCollect(): Array[InternalRow] = {
collectData()
}

// TODO: Terminal split should be implemented differently from non-terminal split.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,21 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow])
}

override def output: Seq[Attribute] = cmd.output

override def children: Seq[SparkPlan] = Nil

override def executeCollect(): Array[Row] = sideEffectResult.toArray
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

protected override def doExecute(): RDD[InternalRow] = {
val convert = CatalystTypeConverters.createToCatalystConverter(schema)
val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow])
sqlContext.sparkContext.parallelize(converted, 1)
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

override def argString: String = cmd.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,10 @@ object EvaluatePython {

def takeAndServe(df: DataFrame, n: Int): Int = {
registerPicklers()
// This is an annoying hack - we should refactor the code so executeCollect and executeTake
// returns InternalRow rather than Row.
val converter = CatalystTypeConverters.createToCatalystConverter(df.schema)
val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row =>
EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], df.schema)
})
val iter = new SerDeUtil.AutoBatchedPickler(
df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
EvaluatePython.toJava(row, df.schema)
})
PythonRDD.serveIterator(iter, s"serve-DataFrame")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ object SparkPlanTest {
}
}
)
resolvedPlan.executeCollect().toSeq
resolvedPlan.executeCollectPublic().toSeq
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {

/** Extends QueryExecution with hive specific features. */
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
extends super.QueryExecution(logicalPlan) {
extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) {

/**
* Returns the result as a hive compatible sequence of strings. For native commands, the
Expand All @@ -581,10 +581,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
.mkString("\t")
}
case command: ExecutedCommand =>
command.executeCollect().map(_(0).toString)
command.executeCollect().map(_.getString(0))

case other =>
val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ case class InsertIntoHiveTable(
*
* Note: this is run once and then kept to avoid double insertions.
*/
protected[sql] lazy val sideEffectResult: Seq[Row] = {
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
Expand Down Expand Up @@ -267,10 +267,10 @@ case class InsertIntoHiveTable(
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
Seq.empty[Row]
Seq.empty[InternalRow]
}

override def executeCollect(): Array[Row] = sideEffectResult.toArray
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)
Expand Down