Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Dec 18, 2016

What changes were proposed in this pull request?

  • A separate subsection for Aggregations under “Getting Started” in the Spark SQL programming guide. It mentions which aggregate functions are predefined and how users can create their own.
  • Examples of using the UserDefinedAggregateFunction abstract class for untyped aggregations in Java and Scala.
  • Examples of using the Aggregator abstract class for type-safe aggregations in Java and Scala.
  • Python is not covered.
  • The PR might not resolve the ticket since I do not know what exactly was planned by the author.

In total, there are four new standalone examples that can be executed via spark-submit or run-example. The updated Spark SQL programming guide references to these examples and does not contain hard-coded snippets.

How was this patch tested?

The patch was tested locally by building the docs. The examples were run as well.

image

@SparkQA
Copy link

SparkQA commented Dec 18, 2016

Test build #70319 has finished for PR 16329 at commit 8c18b2a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class JavaUserDefinedTypedAggregation
  • public static class Salary implements Serializable
  • public static class Average implements Serializable
  • public static class MyAverage extends Aggregator<Salary, Average, Double>
  • public class JavaUserDefinedUntypedAggregation
  • public static class MyAverage extends UserDefinedAggregateFunction
  • case class Salary(person: String, salary: Long)
  • case class Average(sum: Long, count: Long)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a little explanation here. For example, when I first saw this I tried to figure out where "salary" appears in the code as in practice it is being accessed by index only (input.getLong(0)).

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@assafmendelson Yes, your point is definitely reasonable. Now I am thinking whether I should keep "salary" here. As an option, I can replace "salary" with "inputColumn" or something like this to make MyAverage more generic. No reason to bound it to salary. What's your opinion?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go with inputColumn.
What I think should be more strongly explained is that this is basically the schema of the input for the aggregate function and not for the source dataframe. Basically someone might think that their original dataframe might need to have this name for the column.

Copy link

@assafmendelson assafmendelson Dec 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe an explanation on what MutableAggregationBuffer is should be added.
Basically explain the fact that it is a row, how to access it, what it means for it to be mutable (including probably explaining that arrays and map types are immutable even if the buffer itself is mutable) etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I will try to add a small but meaningful explanation here.

@SparkQA
Copy link

SparkQA commented Dec 19, 2016

Test build #70374 has finished for PR 16329 at commit d059d5d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a little confusing to have the comment here for this optimization, but then not implement it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be a little clearer if this was a Person with a name and salary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here with object reuse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe comment what name is doing here. I actually had to look it up.

@marmbrus
Copy link
Contributor

This is great! Thanks for taking the time to write up such complete examples. I think this was a big gap in the existing docs. One other ask. The screen-shot is great, but I'd like to see which parts actually make it into the code snipits in the doc. Ideally you could post a link to compiled doc. If thats hard I can also try to build locally though.

@aokolnychyi
Copy link
Contributor Author

@marmbrus I have updated the pull request. The compiled docs can be found here.

I did not manage to build the Java API docs. I believe the problem is in my local installation. Therefore, I checked each url manually, they should work once the API docs are compiled. I will verify everything one more time in the nightly build.

@SparkQA
Copy link

SparkQA commented Dec 20, 2016

Test build #70405 has finished for PR 16329 at commit 2c1f182.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public static class Employee implements Serializable
  • public static class MyAverage extends Aggregator<Employee, Average, Double>
  • case class Employee(name: String, salary: Long)
  • case class Average(var sum: Long, var count: Long)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a suggestion, I'd change this to read:
"The built-in DataFrames functions provide common aggregations such as count(), countDistinct(), avg(), max(), and min()."

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that will be easier to read. Thanks

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be worth showing an Spark SQL example using the included/pre-defined functions. Since your example implements 'avg', maybe use 'min' / 'max'?

Alternatively, the example could be added to the SQL statements in the main driver for the UserDefinedAggregateFunction implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about this. In my view, it will be appropriate to have a separate subsection before Aggregations to show how to apply predefined SQL functions, including writing your own UDFs. That's will be worth another pull request. Alternatively, I can also try to extend this one to add an example of max() or min(). @marmbrus what's your opinion?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all UDAFs written in Java be static classes? Similarly, should Scala implementations be Scala objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to group all relevant entities for each example into one single class. Static classes are used here just to avoid the cumbersome code for creating the entities inside the main() method. Scala objects are also used due to a similar reason. I can mention this but it will add more comments to the already long examples.

Copy link

@jnh5y jnh5y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comments are generally small suggestions; the PR looks great to me.

I wonder if a discussion of the types and functions in a UDAF would be worthwhile.

Also, the code looks similar to the Hive test code here: https://github.com/apache/spark/blob/master/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you constructing a new object instead of modifying the vars in one of the parameters? Is it required in the merge method and not in the reduce method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michalsenkyr It is not required to create a new object in the merge method. One can modify the vars and return the existing object just like in the reduce method. However, it is less critical here since this method will be called on pre-aggregated data and not for every element. On the one hand, I can apply here the same approach as in the reduce method to make the example consistent. On the other hand, the current code shows that it is not mandatory to modify vars. Probably, a comment might help. I am not sure which approach is better. Therefore, I am open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I prefer consistency. When I saw this, I immediately wondered whether there is a specific reason you did it this way.
I'd rather see both methods use the same paradigm. In this case probably the immutable one as the option of mutability is already mentioned in the comment above.
Or you can mention it again in the comment on this method if you want to provide examples of both. This way it just seems a little confusing.

@michalsenkyr
Copy link
Contributor

If you are having trouble building Javadoc, try switching to Java 7 temporarily. Java 8 introduced stricter Javadoc rules that may fail the docs build. Unfortunately Jenkins doesn't, so new errors get introduced over time.

@SparkQA
Copy link

SparkQA commented Dec 23, 2016

Test build #70558 has finished for PR 16329 at commit 0c55b94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71863 has finished for PR 16329 at commit 0b17e13.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for one small question, the text/code looks OK. Others who know the content seem to approve, and you have gotten tests to pass, so seems reasonable.


public static class MyAverage extends Aggregator<Employee, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be MyAverage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Average is a Java bean that holds current sum and count. It is defined earlier. Here it represents a zero value. MyAverage, in turn, is the actual aggregator that accepts instances of the Employee class, stores intermediate results using an instance ofAverage, and produces Double as a result.

I can rename MyAverage to MyAverageAggregator if this makes things clearer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I read this incorrectly while skimming.

@marmbrus
Copy link
Contributor

Sorry for the delay. This LGTM, but I'm currently away from my Apache SSH keys. Other committers should feel free to merge if you get there before I do.

@gatorsmile
Copy link
Member

Sure, let me quickly go over the changes. Will merge it after that.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

asfgit pushed a commit that referenced this pull request Jan 25, 2017
## What changes were proposed in this pull request?

- A separate subsection for Aggregations under “Getting Started” in the Spark SQL programming guide. It mentions which aggregate functions are predefined and how users can create their own.
- Examples of using the `UserDefinedAggregateFunction` abstract class for untyped aggregations in Java and Scala.
- Examples of using the `Aggregator` abstract class for type-safe aggregations in Java and Scala.
- Python is not covered.
- The PR might not resolve the ticket since I do not know what exactly was planned by the author.

In total, there are four new standalone examples that can be executed via `spark-submit` or `run-example`. The updated Spark SQL programming guide references to these examples and does not contain hard-coded snippets.

## How was this patch tested?

The patch was tested locally by building the docs. The examples were run as well.

![image](https://cloud.githubusercontent.com/assets/6235869/21292915/04d9d084-c515-11e6-811a-999d598dffba.png)

Author: aokolnychyi <[email protected]>

Closes #16329 from aokolnychyi/SPARK-16046.

(cherry picked from commit 3fdce81)
Signed-off-by: gatorsmile <[email protected]>
@gatorsmile
Copy link
Member

Thanks! Merging to master/2.1

@asfgit asfgit closed this in 3fdce81 Jan 25, 2017
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

- A separate subsection for Aggregations under “Getting Started” in the Spark SQL programming guide. It mentions which aggregate functions are predefined and how users can create their own.
- Examples of using the `UserDefinedAggregateFunction` abstract class for untyped aggregations in Java and Scala.
- Examples of using the `Aggregator` abstract class for type-safe aggregations in Java and Scala.
- Python is not covered.
- The PR might not resolve the ticket since I do not know what exactly was planned by the author.

In total, there are four new standalone examples that can be executed via `spark-submit` or `run-example`. The updated Spark SQL programming guide references to these examples and does not contain hard-coded snippets.

## How was this patch tested?

The patch was tested locally by building the docs. The examples were run as well.

![image](https://cloud.githubusercontent.com/assets/6235869/21292915/04d9d084-c515-11e6-811a-999d598dffba.png)

Author: aokolnychyi <[email protected]>

Closes apache#16329 from aokolnychyi/SPARK-16046.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
## What changes were proposed in this pull request?

- A separate subsection for Aggregations under “Getting Started” in the Spark SQL programming guide. It mentions which aggregate functions are predefined and how users can create their own.
- Examples of using the `UserDefinedAggregateFunction` abstract class for untyped aggregations in Java and Scala.
- Examples of using the `Aggregator` abstract class for type-safe aggregations in Java and Scala.
- Python is not covered.
- The PR might not resolve the ticket since I do not know what exactly was planned by the author.

In total, there are four new standalone examples that can be executed via `spark-submit` or `run-example`. The updated Spark SQL programming guide references to these examples and does not contain hard-coded snippets.

## How was this patch tested?

The patch was tested locally by building the docs. The examples were run as well.

![image](https://cloud.githubusercontent.com/assets/6235869/21292915/04d9d084-c515-11e6-811a-999d598dffba.png)

Author: aokolnychyi <[email protected]>

Closes apache#16329 from aokolnychyi/SPARK-16046.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants