Skip to content

Conversation

guowei2
Copy link
Contributor

@guowei2 guowei2 commented Apr 21, 2015

Adding more information about the implementation...

This PR is adding the support of window functions to Spark SQL (specifically OVER and WINDOW clause). For every expression having a OVER clause, we use a WindowExpression as the container of a WindowFunction and the corresponding WindowSpecDefinition (the definition of a window frame, i.e. partition specification, order specification, and frame specification appearing in a OVER clause).

Implementation

The high level work flow of the implementation is described as follows.

  • Query parsing: In the query parse process, all WindowExpressions are originally placed in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. It makes our changes to simple and keep all of parsing rules for window functions at a single place (nodesToWindowSpecification). For the WINDOWclause in a query, we use a WithWindowDefinition as the container as the mapping from the name of a window specification to a WindowSpecDefinition. This changes is similar with our common table expression support.
  • Analysis: The query analysis process has three steps for window functions.
  • Resolve all WindowSpecReferences by replacing them with WindowSpecReferences according to the mapping table stored in the node of WithWindowDefinition.
  • Resolve WindowFunctions in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. For this PR, we use Hive's functions for window functions because we will have a major refactoring of our internal UDAFs and it is better to switch our UDAFs after that refactoring work.
  • Once we have resolved all WindowFunctions, we will use ResolveWindowFunction to extract WindowExpressions from projectList and aggregateExpressions and then create a Window operator for every distinct WindowSpecDefinition. With this choice, at the execution time, we can rely on the Exchange operator to do all of work on reorganizing the table and we do not need to worry about it in the physical Window operator. An example analyzed plan is shown as follows
sql("""
SELECT
  year, country, product, sales,
  avg(sales) over(partition by product) avg_product,
  sum(sales) over(partition by country) sum_country
FROM sales
ORDER BY year, country, product
""").explain(true)

== Analyzed Logical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Project [year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    Project [year#34,country#35,product#36,sales#37]
     MetastoreRelation default, sales, None
  • Query planning: In the process of query planning, we simple generate the physical Window operator based on the logical Window operator. Then, to prepare the executedPlan, the EnsureRequirements rule will add Exchange and Sort operators if necessary. The EnsureRequirements rule will analyze the data properties and try to not add unnecessary shuffle and sort. The physical plan for the above example query is shown below.
== Physical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 200), []
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
    Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
     Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
      HiveTableScan [year#34,country#35,product#36,sales#37], (MetastoreRelation default, sales, None), None
  • Execution time: At execution time, a physical Window operator buffers all rows in a partition specified in the partition spec of a OVER clause. If necessary, it also maintains a sliding window frame. The current implementation tries to buffer the input parameters of a window function according to the window frame to avoid evaluating a row multiple times.

Future work

Here are three improvements that are not hard to add:

  • Taking advantage of the window frame specification to reduce the number of rows buffered in the physical Window operator. For some cases, we only need to buffer the rows appearing in the sliding window. But for other cases, we will not be able to reduce the number of rows buffered (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).
  • When aRAGEN frame is used, for PRECEDING and FOLLOWING, it will be great if the part is an expression (we can start with Literal). So, when the data type of ORDER BY expression is a FractionalType, we can support FractionalType as the type ( still needs to be evaluated as a positive value).
  • When aRAGEN frame is used, we need to support DateType and TimestampType as the data type of the expression appearing in the order specification. Then, the part of PRECEDING and FOLLOWING can support interval types (once we support them).

This is a joint work with @guowei2 and @yhuai
Thanks @hbutani @hvanhovell for his comments
Thanks @scwf for his comments and unit tests

@yhuai
Copy link
Contributor

yhuai commented Apr 21, 2015

ok to test

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30643 has finished for PR 5604 at commit 6858572.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
    • case class WindowFunctionInfo(
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30649 has finished for PR 5604 at commit d5c980f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
    • case class WindowFunctionInfo(
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30650 has finished for PR 5604 at commit 138ff91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
    • case class WindowFunctionInfo(
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30652 has finished for PR 5604 at commit 4453aff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
    • case class WindowFunctionInfo(
  • This patch does not change any dependencies.

@liancheng
Copy link
Contributor

Hey @guowei2, "Spart" => "Spark" in the PR title :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we can remove this ThreadLocal. But if it requires too much work, I guess it's OK to leave it as is. ThreadLocal can always be error prone.

@guowei2 guowei2 changed the title [SPARK-1442][SQL][WIP] Window Function Support for Spart SQL [SPARK-1442][SQL][WIP] Window Function Support for Spark SQL Apr 24, 2015
@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30900 has finished for PR 5604 at commit b4fa747.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
    • case class WindowFunctionInfo(
  • This patch does not change any dependencies.

@yhuai
Copy link
Contributor

yhuai commented Apr 24, 2015

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30904 has finished for PR 5604 at commit cae7079.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
    • case class WindowFunctionInfo(
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30917 has finished for PR 5604 at commit 5b96e2a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30928 has finished for PR 5604 at commit d07101b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
  • This patch does not change any dependencies.

@scwf
Copy link
Contributor

scwf commented Apr 24, 2015

@guowei2 , can you generate golden answer for this locally?

@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30940 has finished for PR 5604 at commit 4bb2c70.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WindowExpression(child: Expression, windowSpec: WindowSpec) extends UnaryExpression
    • case class WindowSpec(windowPartition: WindowPartition, windowFrame: Option[WindowFrame])
    • case class WindowPartition(partitionBy: Seq[Expression], sortBy: Seq[SortOrder])
    • case class WindowFrame(frameType: FrameType, preceding: Int, following: Int)
    • case class WindowAggregate(
    • case class WindowAggregate(
    • case class ComputedWindow(
  • This patch does not change any dependencies.

@hbutani
Copy link

hbutani commented Apr 25, 2015

Hi,

I spent some time on this patch. This is a good start.
But there are several Semantic issues. And I have some
comments/suggestions about the execution. Hope you don't mind my comments.

Semantic Issues:

  1. The order by clause in windowing implies an ordering within each logical partition. Strictly speaking doing a global order(across all logical partitions within a physical partition, i.e. all rows that end up at the same node) is not quite the same; specially when you have rows with the same values(on the order expressions) across partitions. This way of implementing also forces you to read all the rows in the WindowAggregate execution before apply any aggregations. This precludes you from doing any 'streaming' style execution. Effectively you are repartitioning in WindowAggregate, albeit within each WindowAggregate invocation. More on this below, in Execution comments.
  2. 'Current Row' doesn't mean an offset of '0', for Range based windowing. It implies including Rows before or after (depending on the direction of the boundary) that have the same Order expression values.
  3. No 'partition clause' doesn't imply no partitioning; it implies that all rows should be treated as 1 partition.

Execution comments/suggestions:

  1. You want to consider holding onto as few rows as possible. There are several things to consider:
    a. A relatively easy first step is to only read 1 logical partition at a time. But this would require the rows coming in are in the correct order.
    b. My suggestion would be to introduce UDAF level streaming as early as possible. This way for many common cases the memory footprint can be very low. See the work done in hive around 'org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing' This also has a big performance impact; in hive we encountered users who work with partitions of considerable size(10s of thousands); the O(n*n) straightforward way of computing window aggregates was a huge bottleneck.
  2. Why are you translating all Window invocations to HiveUDAF? This way you are taking a hit to go from spark to hive values and back. At least for the AggFunctions already in Spark,why not use them.
  3. The Exchange Operator has a parameter to specify ordering within a partition. Is there a way to generate a physical plan that utilizes this. You wouldn't need a Sort operator above the Exchange then. (I am still learning about Spark-Sql, so I am not sure on the implication of this)

regards,
Harish Butani.

@hvanhovell
Copy link
Contributor

Hi,

I have been experimenting with Window functions in Spark SQL as well. It has been partially based on this. You can find my work here.

I have deviated from the original implementation in couple of ways:

  • Implemented it as an extension to Spark SQL and not Hive. All aggregates use the Spark SQL implementations (not the Hive UDAFs).
  • Use of SPARK 1.4 child ordering requirements. Sorting is planned by the engine; this will especially interesting as soon as exchange will start supporting secondary sorting. I have tried a few sorting schemes but this one is currently the fastest.
  • Only a single window specification (grouping and ordering) is processed at a time. The analyzer should take care of multiple window specifications.
  • The current implementation is semi-blocking; it processes one group at a time. This means only the rows for one group per partition are kept in memory. In the future we should also accommodate the case in which all aggregates are streaming (perhaps with some buffering).

Shall we try to join forces, and come up with one good PR?

Kind regards,
Herman

@yhuai
Copy link
Contributor

yhuai commented Apr 29, 2015

Guys, thank you for your comments. I am updating this PR now. I should have a update later today. Will reply you guys later.

@scwf
Copy link
Contributor

scwf commented Apr 30, 2015

we'd better make a new logical plan named windows just like with, and do the transform work such as windowToPlan in analyzer instead of adding this logical to hiveql since
1 hiveql.scala now is much a big object, adding more logical make it hard to maintain
2 a windows logical plan is useful, we do not want do the same work for window function support in all sql parsers(after the pluggable parser pr in, we can plug in new parser)

@guowei2
Copy link
Contributor Author

guowei2 commented Apr 30, 2015

@scwf I think it is a good choice, thanks.

@yhuai
Copy link
Contributor

yhuai commented Apr 30, 2015

I have been working on it a few days. I believe that my update will cover most of your comments. Please hold your comments until my update. Thanks :)

@yhuai
Copy link
Contributor

yhuai commented May 1, 2015

I have pushed my updated version. For now, I am still using Hive's UDAF. It is mainly because we will do some major refactoring of our internal UDAF in near future (#5542). So, I think it is better to switch to our functions after that refactoring.

@hbutani I believe that I have addressed your comments on the window frame boundary. Regarding the in memory buffer, right now, a physical operator (Window in execution package) only holds a single partition. We can first optimize it to only hold a buffer of [min(current row, start of frame), max(current row, end of frame)](in another pr). Then, we can start to exploit optimization opportunities by considering the function properties.

@hvanhovell Your comments are very good! As mentioned in my reply to Harish, I feel it will be good to use our functions instead Hive's after the refactoring of UDAFs. Regarding "Use of SPARK 1.4 child ordering requirements", this is exactly the way the current version works :) We will take advantage EnsureRequirements to help us organize input rows. The current version handles a single window spec at a time (ask you mentioned in your comments). If multiple functions are using the same spec, we will evaluate them in the same operator. Regarding buffering, please see my reply to Harish. Basically, I am buffering a single partition at a time (instead of buffering all partitions sent to a task). With current implementation, it will be easy to only buffer [min(current row, start of frame), max(current row, end of frame)].

@scwf Once your #5776 is in, I will update it.

@guowei2 Can you remove WIP from the title?

I will start to cleanup my code and add more comments to explain how it works.

@SparkQA
Copy link

SparkQA commented May 1, 2015

Test build #753 has started for PR 5604 at commit 4bb2c70.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not need wrap

@SparkQA
Copy link

SparkQA commented May 5, 2015

Test build #31893 has finished for PR 5604 at commit cf6c7a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JoinedRow6 extends Row
    • case class WindowSpecDefinition(
    • case class WindowSpecReference(name: String) extends WindowSpec
    • sealed trait FrameBoundary
    • case class ValuePreceding(value: Int) extends FrameBoundary
    • case class ValueFollowing(value: Int) extends FrameBoundary
    • case class SpecifiedWindowFrame(
    • trait WindowFunction extends Expression
    • case class UnresolvedWindowFunction(
    • case class UnresolvedWindowExpression(
    • case class WindowExpression(
    • case class WithWindowDefinition(
    • case class Window(
    • case class Window(
    • case class ComputedWindow(

@yhuai yhuai force-pushed the windowImplement branch from cf6c7a0 to 448d468 Compare May 5, 2015 23:45
@yhuai yhuai force-pushed the windowImplement branch from 448d468 to 76fe1c8 Compare May 6, 2015 01:05
@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #31922 has finished for PR 5604 at commit 448d468.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JoinedRow6 extends Row
    • case class WindowSpecDefinition(
    • case class WindowSpecReference(name: String) extends WindowSpec
    • sealed trait FrameBoundary
    • case class ValuePreceding(value: Int) extends FrameBoundary
    • case class ValueFollowing(value: Int) extends FrameBoundary
    • case class SpecifiedWindowFrame(
    • trait WindowFunction extends Expression
    • case class UnresolvedWindowFunction(
    • case class UnresolvedWindowExpression(
    • case class WindowExpression(
    • case class WithWindowDefinition(
    • case class Window(
    • case class Window(
    • case class ComputedWindow(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: a.expressions.forall(_.resolved) more readable

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #767 has finished for PR 5604 at commit 448d468.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #31929 has finished for PR 5604 at commit 76fe1c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JoinedRow6 extends Row
    • case class WindowSpecDefinition(
    • case class WindowSpecReference(name: String) extends WindowSpec
    • sealed trait FrameBoundary
    • case class ValuePreceding(value: Int) extends FrameBoundary
    • case class ValueFollowing(value: Int) extends FrameBoundary
    • case class SpecifiedWindowFrame(
    • trait WindowFunction extends Expression
    • case class UnresolvedWindowFunction(
    • case class UnresolvedWindowExpression(
    • case class WindowExpression(
    • case class WithWindowDefinition(
    • case class Window(
    • case class Window(
    • case class ComputedWindow(

@marmbrus
Copy link
Contributor

marmbrus commented May 6, 2015

Only minor comments that can be addressed in a followup. Merging to master and 1.4.

asfgit pushed a commit that referenced this pull request May 6, 2015
Adding more information about the implementation...

This PR is adding the support of window functions to Spark SQL (specifically OVER and WINDOW clause). For every expression having a OVER clause, we use a WindowExpression as the container of a WindowFunction and the corresponding WindowSpecDefinition (the definition of a window frame, i.e. partition specification, order specification, and frame specification appearing in a OVER clause).
# Implementation #
The high level work flow of the implementation is described as follows.

*	Query parsing: In the query parse process, all WindowExpressions are originally placed in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. It makes our changes to simple and keep all of parsing rules for window functions at a single place (nodesToWindowSpecification). For the WINDOWclause in a query, we use a WithWindowDefinition as the container as the mapping from the name of a window specification to a WindowSpecDefinition. This changes is similar with our common table expression support.

*	Analysis: The query analysis process has three steps for window functions.

 *	Resolve all WindowSpecReferences by replacing them with WindowSpecReferences according to the mapping table stored in the node of WithWindowDefinition.
 *	Resolve WindowFunctions in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. For this PR, we use Hive's functions for window functions because we will have a major refactoring of our internal UDAFs and it is better to switch our UDAFs after that refactoring work.
 *	Once we have resolved all WindowFunctions, we will use ResolveWindowFunction to extract WindowExpressions from projectList and aggregateExpressions and then create a Window operator for every distinct WindowSpecDefinition. With this choice, at the execution time, we can rely on the Exchange operator to do all of work on reorganizing the table and we do not need to worry about it in the physical Window operator. An example analyzed plan is shown as follows

```
sql("""
SELECT
  year, country, product, sales,
  avg(sales) over(partition by product) avg_product,
  sum(sales) over(partition by country) sum_country
FROM sales
ORDER BY year, country, product
""").explain(true)

== Analyzed Logical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Project [year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    Project [year#34,country#35,product#36,sales#37]
     MetastoreRelation default, sales, None
```

*	Query planning: In the process of query planning, we simple generate the physical Window operator based on the logical Window operator. Then, to prepare the executedPlan, the EnsureRequirements rule will add Exchange and Sort operators if necessary. The EnsureRequirements rule will analyze the data properties and try to not add unnecessary shuffle and sort. The physical plan for the above example query is shown below.

```
== Physical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 200), []
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
    Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
     Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
      HiveTableScan [year#34,country#35,product#36,sales#37], (MetastoreRelation default, sales, None), None
```

*	Execution time: At execution time, a physical Window operator buffers all rows in a partition specified in the partition spec of a OVER clause. If necessary, it also maintains a sliding window frame. The current implementation tries to buffer the input parameters of a window function according to the window frame to avoid evaluating a row multiple times.

# Future work #

Here are three improvements that are not hard to add:
*	Taking advantage of the window frame specification to reduce the number of rows buffered in the physical Window operator. For some cases, we only need to buffer the rows appearing in the sliding window. But for other cases, we will not be able to reduce the number of rows buffered (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).

*	When aRAGEN frame is used, for <value> PRECEDING and <value> FOLLOWING, it will be great if the <value> part is an expression (we can start with Literal). So, when the data type of ORDER BY expression is a FractionalType, we can support FractionalType as the type <value> (<value> still needs to be evaluated as a positive value).

*	When aRAGEN frame is used, we need to support DateType and TimestampType as the data type of the expression appearing in the order specification. Then, the <value> part of <value> PRECEDING and <value> FOLLOWING can support interval types (once we support them).

This is a joint work with guowei2 and yhuai
Thanks hbutani hvanhovell for his comments
Thanks scwf for his comments and unit tests

Author: Yin Huai <[email protected]>

Closes #5604 from guowei2/windowImplement and squashes the following commits:

76fe1c8 [Yin Huai] Implementation.
aa2b0ae [Yin Huai] Tests.

(cherry picked from commit f2c4708)
Signed-off-by: Michael Armbrust <[email protected]>
@asfgit asfgit closed this in f2c4708 May 6, 2015
@shivaram
Copy link
Contributor

shivaram commented May 6, 2015

I think this PR might have broken the build ? I am synced to master and ran

build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests -Psparkr clean package

I get the following error

[error] /Users/shivaram/debian-shared/spark-1/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala:769: not found: type HiveCompatibilitySuite
[error]   extends HiveCompatibilitySuite with BeforeAndAfter {
[error]           ^
[error] /Users/shivaram/debian-shared/spark-1/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala:828: value testCases is not a member of org.scalatest.BeforeAndAfter
[error]   override def testCases: Seq[(String, File)] = super.testCases.filter {
[error]                                                       ^
[error] two errors found
[error] Compile failed at May 6, 2015 2:00:25 PM [5.890s]

@yhuai
Copy link
Contributor

yhuai commented May 6, 2015

@shivaram I am looking at it. Seems I need to move this test to compatibility sub project of hive.

@yhuai
Copy link
Contributor

yhuai commented May 6, 2015

@shivaram Fixed the build with 7740996.

@shivaram
Copy link
Contributor

shivaram commented May 6, 2015

Thanks @yhuai

asfgit pushed a commit that referenced this pull request May 7, 2015
…ion PR (#5604).

Address marmbrus and scwf's comments in #5604.

Author: Yin Huai <[email protected]>

Closes #5945 from yhuai/windowFollowup and squashes the following commits:

0ef879d [Yin Huai] Add collectFirst to TreeNode.
2373968 [Yin Huai] wip
4a16df9 [Yin Huai] Address minor comments for [SPARK-1442].

(cherry picked from commit 5784c8d)
Signed-off-by: Michael Armbrust <[email protected]>
asfgit pushed a commit that referenced this pull request May 7, 2015
…ion PR (#5604).

Address marmbrus and scwf's comments in #5604.

Author: Yin Huai <[email protected]>

Closes #5945 from yhuai/windowFollowup and squashes the following commits:

0ef879d [Yin Huai] Add collectFirst to TreeNode.
2373968 [Yin Huai] wip
4a16df9 [Yin Huai] Address minor comments for [SPARK-1442].
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
Adding more information about the implementation...

This PR is adding the support of window functions to Spark SQL (specifically OVER and WINDOW clause). For every expression having a OVER clause, we use a WindowExpression as the container of a WindowFunction and the corresponding WindowSpecDefinition (the definition of a window frame, i.e. partition specification, order specification, and frame specification appearing in a OVER clause).
# Implementation #
The high level work flow of the implementation is described as follows.

*	Query parsing: In the query parse process, all WindowExpressions are originally placed in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. It makes our changes to simple and keep all of parsing rules for window functions at a single place (nodesToWindowSpecification). For the WINDOWclause in a query, we use a WithWindowDefinition as the container as the mapping from the name of a window specification to a WindowSpecDefinition. This changes is similar with our common table expression support.

*	Analysis: The query analysis process has three steps for window functions.

 *	Resolve all WindowSpecReferences by replacing them with WindowSpecReferences according to the mapping table stored in the node of WithWindowDefinition.
 *	Resolve WindowFunctions in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. For this PR, we use Hive's functions for window functions because we will have a major refactoring of our internal UDAFs and it is better to switch our UDAFs after that refactoring work.
 *	Once we have resolved all WindowFunctions, we will use ResolveWindowFunction to extract WindowExpressions from projectList and aggregateExpressions and then create a Window operator for every distinct WindowSpecDefinition. With this choice, at the execution time, we can rely on the Exchange operator to do all of work on reorganizing the table and we do not need to worry about it in the physical Window operator. An example analyzed plan is shown as follows

```
sql("""
SELECT
  year, country, product, sales,
  avg(sales) over(partition by product) avg_product,
  sum(sales) over(partition by country) sum_country
FROM sales
ORDER BY year, country, product
""").explain(true)

== Analyzed Logical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Project [year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    Project [year#34,country#35,product#36,sales#37]
     MetastoreRelation default, sales, None
```

*	Query planning: In the process of query planning, we simple generate the physical Window operator based on the logical Window operator. Then, to prepare the executedPlan, the EnsureRequirements rule will add Exchange and Sort operators if necessary. The EnsureRequirements rule will analyze the data properties and try to not add unnecessary shuffle and sort. The physical plan for the above example query is shown below.

```
== Physical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 200), []
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
    Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
     Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
      HiveTableScan [year#34,country#35,product#36,sales#37], (MetastoreRelation default, sales, None), None
```

*	Execution time: At execution time, a physical Window operator buffers all rows in a partition specified in the partition spec of a OVER clause. If necessary, it also maintains a sliding window frame. The current implementation tries to buffer the input parameters of a window function according to the window frame to avoid evaluating a row multiple times.

# Future work #

Here are three improvements that are not hard to add:
*	Taking advantage of the window frame specification to reduce the number of rows buffered in the physical Window operator. For some cases, we only need to buffer the rows appearing in the sliding window. But for other cases, we will not be able to reduce the number of rows buffered (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).

*	When aRAGEN frame is used, for <value> PRECEDING and <value> FOLLOWING, it will be great if the <value> part is an expression (we can start with Literal). So, when the data type of ORDER BY expression is a FractionalType, we can support FractionalType as the type <value> (<value> still needs to be evaluated as a positive value).

*	When aRAGEN frame is used, we need to support DateType and TimestampType as the data type of the expression appearing in the order specification. Then, the <value> part of <value> PRECEDING and <value> FOLLOWING can support interval types (once we support them).

This is a joint work with guowei2 and yhuai
Thanks hbutani hvanhovell for his comments
Thanks scwf for his comments and unit tests

Author: Yin Huai <[email protected]>

Closes apache#5604 from guowei2/windowImplement and squashes the following commits:

76fe1c8 [Yin Huai] Implementation.
aa2b0ae [Yin Huai] Tests.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
…ion PR (apache#5604).

Address marmbrus and scwf's comments in apache#5604.

Author: Yin Huai <[email protected]>

Closes apache#5945 from yhuai/windowFollowup and squashes the following commits:

0ef879d [Yin Huai] Add collectFirst to TreeNode.
2373968 [Yin Huai] wip
4a16df9 [Yin Huai] Address minor comments for [SPARK-1442].
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
Adding more information about the implementation...

This PR is adding the support of window functions to Spark SQL (specifically OVER and WINDOW clause). For every expression having a OVER clause, we use a WindowExpression as the container of a WindowFunction and the corresponding WindowSpecDefinition (the definition of a window frame, i.e. partition specification, order specification, and frame specification appearing in a OVER clause).
# Implementation #
The high level work flow of the implementation is described as follows.

*	Query parsing: In the query parse process, all WindowExpressions are originally placed in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. It makes our changes to simple and keep all of parsing rules for window functions at a single place (nodesToWindowSpecification). For the WINDOWclause in a query, we use a WithWindowDefinition as the container as the mapping from the name of a window specification to a WindowSpecDefinition. This changes is similar with our common table expression support.

*	Analysis: The query analysis process has three steps for window functions.

 *	Resolve all WindowSpecReferences by replacing them with WindowSpecReferences according to the mapping table stored in the node of WithWindowDefinition.
 *	Resolve WindowFunctions in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. For this PR, we use Hive's functions for window functions because we will have a major refactoring of our internal UDAFs and it is better to switch our UDAFs after that refactoring work.
 *	Once we have resolved all WindowFunctions, we will use ResolveWindowFunction to extract WindowExpressions from projectList and aggregateExpressions and then create a Window operator for every distinct WindowSpecDefinition. With this choice, at the execution time, we can rely on the Exchange operator to do all of work on reorganizing the table and we do not need to worry about it in the physical Window operator. An example analyzed plan is shown as follows

```
sql("""
SELECT
  year, country, product, sales,
  avg(sales) over(partition by product) avg_product,
  sum(sales) over(partition by country) sum_country
FROM sales
ORDER BY year, country, product
""").explain(true)

== Analyzed Logical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Project [year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    Project [year#34,country#35,product#36,sales#37]
     MetastoreRelation default, sales, None
```

*	Query planning: In the process of query planning, we simple generate the physical Window operator based on the logical Window operator. Then, to prepare the executedPlan, the EnsureRequirements rule will add Exchange and Sort operators if necessary. The EnsureRequirements rule will analyze the data properties and try to not add unnecessary shuffle and sort. The physical plan for the above example query is shown below.

```
== Physical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 200), []
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
    Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
     Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
      HiveTableScan [year#34,country#35,product#36,sales#37], (MetastoreRelation default, sales, None), None
```

*	Execution time: At execution time, a physical Window operator buffers all rows in a partition specified in the partition spec of a OVER clause. If necessary, it also maintains a sliding window frame. The current implementation tries to buffer the input parameters of a window function according to the window frame to avoid evaluating a row multiple times.

# Future work #

Here are three improvements that are not hard to add:
*	Taking advantage of the window frame specification to reduce the number of rows buffered in the physical Window operator. For some cases, we only need to buffer the rows appearing in the sliding window. But for other cases, we will not be able to reduce the number of rows buffered (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).

*	When aRAGEN frame is used, for <value> PRECEDING and <value> FOLLOWING, it will be great if the <value> part is an expression (we can start with Literal). So, when the data type of ORDER BY expression is a FractionalType, we can support FractionalType as the type <value> (<value> still needs to be evaluated as a positive value).

*	When aRAGEN frame is used, we need to support DateType and TimestampType as the data type of the expression appearing in the order specification. Then, the <value> part of <value> PRECEDING and <value> FOLLOWING can support interval types (once we support them).

This is a joint work with guowei2 and yhuai
Thanks hbutani hvanhovell for his comments
Thanks scwf for his comments and unit tests

Author: Yin Huai <[email protected]>

Closes apache#5604 from guowei2/windowImplement and squashes the following commits:

76fe1c8 [Yin Huai] Implementation.
aa2b0ae [Yin Huai] Tests.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…ion PR (apache#5604).

Address marmbrus and scwf's comments in apache#5604.

Author: Yin Huai <[email protected]>

Closes apache#5945 from yhuai/windowFollowup and squashes the following commits:

0ef879d [Yin Huai] Add collectFirst to TreeNode.
2373968 [Yin Huai] wip
4a16df9 [Yin Huai] Address minor comments for [SPARK-1442].
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
Adding more information about the implementation...

This PR is adding the support of window functions to Spark SQL (specifically OVER and WINDOW clause). For every expression having a OVER clause, we use a WindowExpression as the container of a WindowFunction and the corresponding WindowSpecDefinition (the definition of a window frame, i.e. partition specification, order specification, and frame specification appearing in a OVER clause).
# Implementation #
The high level work flow of the implementation is described as follows.

*	Query parsing: In the query parse process, all WindowExpressions are originally placed in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. It makes our changes to simple and keep all of parsing rules for window functions at a single place (nodesToWindowSpecification). For the WINDOWclause in a query, we use a WithWindowDefinition as the container as the mapping from the name of a window specification to a WindowSpecDefinition. This changes is similar with our common table expression support.

*	Analysis: The query analysis process has three steps for window functions.

 *	Resolve all WindowSpecReferences by replacing them with WindowSpecReferences according to the mapping table stored in the node of WithWindowDefinition.
 *	Resolve WindowFunctions in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. For this PR, we use Hive's functions for window functions because we will have a major refactoring of our internal UDAFs and it is better to switch our UDAFs after that refactoring work.
 *	Once we have resolved all WindowFunctions, we will use ResolveWindowFunction to extract WindowExpressions from projectList and aggregateExpressions and then create a Window operator for every distinct WindowSpecDefinition. With this choice, at the execution time, we can rely on the Exchange operator to do all of work on reorganizing the table and we do not need to worry about it in the physical Window operator. An example analyzed plan is shown as follows

```
sql("""
SELECT
  year, country, product, sales,
  avg(sales) over(partition by product) avg_product,
  sum(sales) over(partition by country) sum_country
FROM sales
ORDER BY year, country, product
""").explain(true)

== Analyzed Logical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Project [year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    Project [year#34,country#35,product#36,sales#37]
     MetastoreRelation default, sales, None
```

*	Query planning: In the process of query planning, we simple generate the physical Window operator based on the logical Window operator. Then, to prepare the executedPlan, the EnsureRequirements rule will add Exchange and Sort operators if necessary. The EnsureRequirements rule will analyze the data properties and try to not add unnecessary shuffle and sort. The physical plan for the above example query is shown below.

```
== Physical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 200), []
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
    Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
     Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
      HiveTableScan [year#34,country#35,product#36,sales#37], (MetastoreRelation default, sales, None), None
```

*	Execution time: At execution time, a physical Window operator buffers all rows in a partition specified in the partition spec of a OVER clause. If necessary, it also maintains a sliding window frame. The current implementation tries to buffer the input parameters of a window function according to the window frame to avoid evaluating a row multiple times.

# Future work #

Here are three improvements that are not hard to add:
*	Taking advantage of the window frame specification to reduce the number of rows buffered in the physical Window operator. For some cases, we only need to buffer the rows appearing in the sliding window. But for other cases, we will not be able to reduce the number of rows buffered (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).

*	When aRAGEN frame is used, for <value> PRECEDING and <value> FOLLOWING, it will be great if the <value> part is an expression (we can start with Literal). So, when the data type of ORDER BY expression is a FractionalType, we can support FractionalType as the type <value> (<value> still needs to be evaluated as a positive value).

*	When aRAGEN frame is used, we need to support DateType and TimestampType as the data type of the expression appearing in the order specification. Then, the <value> part of <value> PRECEDING and <value> FOLLOWING can support interval types (once we support them).

This is a joint work with guowei2 and yhuai
Thanks hbutani hvanhovell for his comments
Thanks scwf for his comments and unit tests

Author: Yin Huai <[email protected]>

Closes apache#5604 from guowei2/windowImplement and squashes the following commits:

76fe1c8 [Yin Huai] Implementation.
aa2b0ae [Yin Huai] Tests.
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…ion PR (apache#5604).

Address marmbrus and scwf's comments in apache#5604.

Author: Yin Huai <[email protected]>

Closes apache#5945 from yhuai/windowFollowup and squashes the following commits:

0ef879d [Yin Huai] Add collectFirst to TreeNode.
2373968 [Yin Huai] wip
4a16df9 [Yin Huai] Address minor comments for [SPARK-1442].
}

case class Window(
projectList: Seq[Attribute],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need a projectList in Window? Isn't it always equal to child.output?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question. It is Seq[Attribute]. Unlike the one in Project, we are unable to put Alias here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we do not actually need projectList since it is always child.output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can any of you submit a PR to make the change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it today. @yhuai Thanks!

ghost pushed a commit to dbtsai/spark that referenced this pull request Mar 11, 2016
…iminate useless Window

#### What changes were proposed in this pull request?

`projectList` is useless. Its value is always the same as the child.output. Remove it from the class `Window`. Removal can simplify the codes in Analyzer and Optimizer.

This PR is based on the discussion started by cloud-fan in a separate PR:
apache#5604 (comment)

This PR also eliminates useless `Window`.

cloud-fan yhuai

#### How was this patch tested?

Existing test cases cover it.

Author: gatorsmile <[email protected]>
Author: xiaoli <[email protected]>
Author: Xiao Li <[email protected]>

Closes apache#11565 from gatorsmile/removeProjListWindow.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
…iminate useless Window

#### What changes were proposed in this pull request?

`projectList` is useless. Its value is always the same as the child.output. Remove it from the class `Window`. Removal can simplify the codes in Analyzer and Optimizer.

This PR is based on the discussion started by cloud-fan in a separate PR:
apache#5604 (comment)

This PR also eliminates useless `Window`.

cloud-fan yhuai

#### How was this patch tested?

Existing test cases cover it.

Author: gatorsmile <[email protected]>
Author: xiaoli <[email protected]>
Author: Xiao Li <[email protected]>

Closes apache#11565 from gatorsmile/removeProjListWindow.
def defaultWindowFrame(
hasOrderSpecification: Boolean,
acceptWindowFrame: Boolean): SpecifiedWindowFrame = {
if (hasOrderSpecification && acceptWindowFrame) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why the default window frame could be different due to order spec here? Is that for some kind of compatibility issues? cc @cloud-fan @gatorsmile @yhuai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are window functions that do not support setting a window frame (e.g. rank). So, for them, acceptWindowFrame is false and the whole partition is the frame.

For functions that do support setting a window frame, the default window frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. Please note that at here, all rows considered as the peer row of the current row are included in the frame. ORDER BY clause is used to determine if two row can be considered as peer rows. For example, ORDER BY c means that if two rows have the same value on column c, they are peer rows. So, without a ORDER BY clause, all rows are considered as the peer row of the current row, which means that the frame is effectively the entire partition.

Related references:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.