Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ abstract class Collect extends ImperativeAggregate {

override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)

override def supportsPartial: Boolean = false
override def forceSortAggregate: Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, after changing this name, it will not show that we do not partial agg for this function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea. either way, it seems partial agg. becomes meaningless in future.


override def aggBufferAttributes: Seq[AttributeReference] = Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ sealed abstract class AggregateFunction extends Expression with ImplicitCastInpu
def inputAggBufferAttributes: Seq[AttributeReference]

/**
* Indicates if this function supports partial aggregation.
* Currently Hive UDAF is the only one that doesn't support partial aggregation.
* Indicates if this function needs to aggregate values group-by-group in a single step.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also the inverse.

* If true, we must always use a `SortAggregateExec` operator without partial aggregates.
*/
def supportsPartial: Boolean = true
def forceSortAggregate: Boolean = false

/**
* Result of the aggregate function when the input is empty. This is currently only used for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowF
override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
override def dataType: DataType = IntegerType
override def nullable: Boolean = true
override def supportsPartial: Boolean = false
override def forceSortAggregate: Boolean = true
override lazy val mergeExpressions =
throw new UnsupportedOperationException("Window Functions do not support merging.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

val aggregateOperator =
if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
if (aggregateExpressions.map(_.aggregateFunction).exists(_.forceSortAggregate)) {
if (functionsWithDistinct.nonEmpty) {
sys.error("Distinct columns cannot exist in Aggregate operator containing " +
"aggregate functions which don't support partial aggregation.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private[hive] case class HiveUDAFFunction(

override def nullable: Boolean = true

override def supportsPartial: Boolean = false
override def forceSortAggregate: Boolean = true

override lazy val dataType: DataType = inspectorToDataType(returnInspector)

Expand Down