Skip to content

Commit 9069de6

Browse files
committed
Merge branch 'master' into eval
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala
2 parents 3ccc313 + accd099 commit 9069de6

File tree

8 files changed

+157
-152
lines changed

8 files changed

+157
-152
lines changed

project/SparkBuild.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ object SparkBuild extends Build {
178178
fork := true,
179179
javaOptions in Test += "-Dspark.home=" + sparkHome,
180180
javaOptions in Test += "-Dspark.testing=1",
181+
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
181182
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark").map { case (k,v) => s"-D$k=$v" }.toSeq,
182183
javaOptions += "-Xmx3g",
183184
// Show full stack trace and duration in test cases.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,17 @@ case class BoundReference(ordinal: Int, baseReference: Attribute)
4848
override def eval(input: Row): Any = input(ordinal)
4949
}
5050

51+
/**
52+
* Used to denote operators that do their own binding of attributes internally.
53+
*/
54+
trait NoBind { self: trees.TreeNode[_] => }
55+
5156
class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
5257
import BindReferences._
5358

5459
def apply(plan: TreeNode): TreeNode = {
5560
plan.transform {
61+
case n: NoBind => n.asInstanceOf[TreeNode]
5662
case leafNode if leafNode.children.isEmpty => leafNode
5763
case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e =>
5864
bindReference(e, unaryNode.children.head.output)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) {
2929
protected val exprArray = expressions.toArray
3030

3131
def apply(input: Row): Row = {
32-
val outputArray = new Array[Any](exprArray.size)
32+
val outputArray = new Array[Any](exprArray.length)
3333
var i = 0
34-
while (i < exprArray.size) {
34+
while (i < exprArray.length) {
3535
outputArray(i) = exprArray(i).eval(input)
3636
i += 1
3737
}
@@ -58,7 +58,7 @@ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row)
5858

5959
def apply(input: Row): Row = {
6060
var i = 0
61-
while (i < exprArray.size) {
61+
while (i < exprArray.length) {
6262
mutableRow(i) = exprArray(i).eval(input)
6363
i += 1
6464
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ abstract class AggregateExpression extends Expression {
2727
* Creates a new instance that can be used to compute this aggregate expression for a group
2828
* of input rows/
2929
*/
30-
def newInstance: AggregateFunction
30+
def newInstance(): AggregateFunction
3131
}
3232

3333
/**
@@ -75,7 +75,7 @@ abstract class AggregateFunction
7575
override def eval(input: Row): Any
7676

7777
// Do we really need this?
78-
override def newInstance = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
78+
override def newInstance() = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
7979
}
8080

8181
case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -89,7 +89,7 @@ case class Count(child: Expression) extends PartialAggregate with trees.UnaryNod
8989
SplitEvaluation(Sum(partialCount.toAttribute), partialCount :: Nil)
9090
}
9191

92-
override def newInstance = new CountFunction(child, this)
92+
override def newInstance()= new CountFunction(child, this)
9393
}
9494

9595
case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpression {
@@ -98,7 +98,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi
9898
override def nullable = false
9999
override def dataType = IntegerType
100100
override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})"
101-
override def newInstance = new CountDistinctFunction(expressions, this)
101+
override def newInstance()= new CountDistinctFunction(expressions, this)
102102
}
103103

104104
case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -118,7 +118,7 @@ case class Average(child: Expression) extends PartialAggregate with trees.UnaryN
118118
partialCount :: partialSum :: Nil)
119119
}
120120

121-
override def newInstance = new AverageFunction(child, this)
121+
override def newInstance()= new AverageFunction(child, this)
122122
}
123123

124124
case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -134,7 +134,7 @@ case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[
134134
partialSum :: Nil)
135135
}
136136

137-
override def newInstance = new SumFunction(child, this)
137+
override def newInstance()= new SumFunction(child, this)
138138
}
139139

140140
case class SumDistinct(child: Expression)
@@ -145,7 +145,7 @@ case class SumDistinct(child: Expression)
145145
override def dataType = child.dataType
146146
override def toString = s"SUM(DISTINCT $child)"
147147

148-
override def newInstance = new SumDistinctFunction(child, this)
148+
override def newInstance()= new SumDistinctFunction(child, this)
149149
}
150150

151151
case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -160,7 +160,7 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod
160160
First(partialFirst.toAttribute),
161161
partialFirst :: Nil)
162162
}
163-
override def newInstance = new FirstFunction(child, this)
163+
override def newInstance()= new FirstFunction(child, this)
164164
}
165165

166166
case class AverageFunction(expr: Expression, base: AggregateExpression)

sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala

Lines changed: 0 additions & 100 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
7676
*/
7777
object AddExchange extends Rule[SparkPlan] {
7878
// TODO: Determine the number of partitions.
79-
val numPartitions = 8
79+
val numPartitions = 150
8080

8181
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
8282
case operator: SparkPlan =>

0 commit comments

Comments
 (0)