Skip to content

Conversation

@NarineK
Copy link
Contributor

@NarineK NarineK commented Aug 1, 2016

What changes were proposed in this pull request?

The following pull request addresses the new feature request described in SPARK-16258.
It automatically('by default') appends grouping keys to output DataFrame.

I've also tried to solve the problem by adding an optional flag in gapply that states if the key is required or not. However, the optional flag needs to be passed as an argument through a number of methods which is not necessarily elegant and leads to some issues such as "The number of parameters should not exceed 10" in '..../logical/object.scala:290'

Since this pull request already appends the grouping key automatically, I was thinking if we really need to pass 'key' as R functions input argument - function(key, x) {....} Isn't it superfluous ?
I'd be happy to hear your thoughts on that.

Thanks!

How was this patch tested?

Test cases in R.

@NarineK NarineK changed the title [SPARK-16258][SparkR][WIP] Gapply add key attach option [SPARK-16258][SparkR][WIP] Automatically append the grouping keys in SparkR's gapply Aug 1, 2016
@NarineK NarineK changed the title [SPARK-16258][SparkR][WIP] Automatically append the grouping keys in SparkR's gapply [SPARK-16258][SparkR] Automatically append the grouping keys in SparkR's gapply Aug 1, 2016
docs/sparkr.md Outdated
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

## waiting max_eruption
##1 64 5.100
Copy link
Contributor Author

@NarineK NarineK Aug 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously, there was a typo in the examples.
It is easy to see by running:

> result <- data.frame(aggregate(faithful$eruptions, by = list(faithful$waiting), FUN = max))
> result <-  head(result[order(result$x, decreasing = TRUE), ])
> result

@SparkQA
Copy link

SparkQA commented Aug 1, 2016

Test build #63059 has finished for PR 14431 at commit 575fcf8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2016

Test build #63064 has finished for PR 14431 at commit f235227.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2016

Test build #63065 has finished for PR 14431 at commit 44ee864.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2016

Test build #63061 has finished for PR 14431 at commit 8db1d08.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

shivaram commented Aug 1, 2016

@NarineK Thanks for the PR. The thing I worry about is that this will break any code users write with the 2.0 release and they'll need to change their code if we ship this in 2.1 -- Other than passing the option around, do you know if there is any way to maintain backwards compatibility ?

@NarineK
Copy link
Contributor Author

NarineK commented Aug 2, 2016

That's a good point, @shivaram.
worker.R is the component which has the keys and appends it to the output.
I don't see any elegant way of doing it in worker.R yet.

However, I was thinking about the following option:
We can still have optional flag in gapply that states if the key is required or not but we will not pass it over to scala side.
By default we can always prepend keys in worker.R and in group.R we can have a check such as:

if (!prependKey) {
  // de-attach/remove the appended key columns.
}

Is this sound reasonable or is it a still hackish ?

@shivaram
Copy link
Contributor

shivaram commented Aug 2, 2016

Yeah I think something like that is fine. Basically doing some pre-processing or post-processing after the UDF has run using our own R code is a good way to add new features

@NarineK
Copy link
Contributor Author

NarineK commented Aug 2, 2016

cool! Let me give a try that option.

@NarineK
Copy link
Contributor Author

NarineK commented Aug 7, 2016

It seems that, currently, in SparkR the GroupedData which represents scala's GroupedData object doesn't have any information about the grouping keys. RelationalGroupedDataset has a private attribute groupingExpr which contains information about grouping columns, however it is not accessible from R side. I was thinking that maybe we could pass grouping columns to groups.R like: groupedData(sgd, cols).
Any thoughts @shivaram ?
Thanks!

@shivaram
Copy link
Contributor

shivaram commented Aug 9, 2016

Sure - Appending more information to the R object is fine. Also it looks like we actually have a handle to the RelationalGroupedDataset when we call groupBy on the scala side

def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {

@NarineK
Copy link
Contributor Author

NarineK commented Aug 10, 2016

Thanks, @shivaram! Yes, we have a handle to RelationalGroupedDataset, but I couldn't access column fields of RelationalGroupedDataset's instance. Is there a way to access the columns ?

@shivaram
Copy link
Contributor

I'm not sure I understand the question. Also some of the SQL committers like @liancheng might be able to answer this better

@NarineK
Copy link
Contributor Author

NarineK commented Aug 10, 2016

My point is the following: Let's say we have the following:
var relationalGroupedDataset = df.groupBy("col1", "col2");
Now, having relationalGroupedDataset how can I find out the grouping columns.
there is nothing like:
relationalGroupedDataset.columns or relationalGroupedDataset.groupExpression
Is there ?

@shivaram
Copy link
Contributor

groupingExprs is a member of the class as I can see in [1]. Also we convert these grouping expressions to columns in the flatMapGroupsInR function [2] -- So we could add a new function that just does a similar mapping but just returns the column names ?

[1] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala#L48
[2] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala#L413

@NarineK
Copy link
Contributor Author

NarineK commented Aug 12, 2016

yes, @shivaram , that will be one way to do.
Basically, adding a new public function to RelationalGroupedDataset which will return the column names.
If it is fine from SQL perspective, maybe I can create a separate pull request for that ?
cc: @liancheng

@NarineK
Copy link
Contributor Author

NarineK commented Aug 22, 2016

Made a pull request for grouping columns: #14742

@sameeragarwal
Copy link
Member

@NarineK @shivaram any updates here? Also cc @felixcheung

@felixcheung
Copy link
Member

felixcheung commented Jun 19, 2017 via email

@shivaram
Copy link
Contributor

AFAIK this was dependent on #14742, but @NarineK may know better

@NarineK
Copy link
Contributor Author

NarineK commented Jun 19, 2017

Hi everyone, yes it depends on #14742 . I've been asked to close #14742.
For this PR I need to access the grouping columns. If you think that there is an alternative way of accessing that information, I'd be happy to make the changes in this PR.
Thanks!

@gatorsmile
Copy link
Member

This will introduce an external behavior change, right?

@NarineK
Copy link
Contributor Author

NarineK commented Jun 19, 2017

yes, but we only need read access.

#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
#' 3 2.351890 0.6548350 0.2375602 0.2521257
#' Model Species (Intercept) Sepal_Width Petal_Length Petal_Width
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an external change. I think such an external change is not acceptable after we already introduce it. Right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's going to be a breaking change, yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past we had a discussion about backward compatibility with shivaram.
#14431 (comment)

I think I didn't push R changes, because I wanted to be able to access the grouping columns on sql side first. Without being able to access the grouping columns I couldn't find a way to keep backward compatibility without breaking anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I'd think it's reasonable if under a switch

@gatorsmile
Copy link
Member

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala#L51

You just need to change the above line

val groupingExprs: Seq[Expression],

You can access groupingExprs in SQLUtils.scala.

@falaki
Copy link
Contributor

falaki commented Jun 30, 2017

@NarineK how about adding this as a new API e.g., gapplyWithKeys(). I am extremely worried about the semantic change. It can break existing SparkR applications and will be confusing for users.

@NarineK
Copy link
Contributor Author

NarineK commented Jun 30, 2017

@falaki, I'd be fine with a separate gapplyWithKeys() method too.
@shivaram, @felixcheung what do you think ? Should we add a new gapplyWithKeys() method ?

@NarineK
Copy link
Contributor Author

NarineK commented Jun 30, 2017

Thank you, @gatorsmile! I'll give a try.

@felixcheung
Copy link
Member

felixcheung commented Jun 30, 2017 via email

@falaki
Copy link
Contributor

falaki commented Jun 30, 2017

If we want to avoid yet another method, we could add this functionality as a non-default behavior. E.g.,

gapply(df, "key", function(key, x) { x }, schema(df), appendKeys = F)

@shivaram
Copy link
Contributor

Compared to introducing a new API, I think @falaki 's idea of adding a non-default option is better

@NarineK
Copy link
Contributor Author

NarineK commented Jun 30, 2017

I think @falaki's approach is good, only I find the key which is passed as an argument together with x as an input of function is a little superfluous.

@felixcheung
Copy link
Member

btw, if the key is the very first column, that sounds like prefix and not append?
perhaps return.data.frame.key.column = FALSE?

and about your comment, do you mean key in function(key, x) { x }?
IMO it's quite helpful to know what group (ie. key) is the UDF processing?

@NarineK
Copy link
Contributor Author

NarineK commented Jul 1, 2017

I think prepend sounds better. What do you think ?
Yes, the key in function(key, x) { x } can be useful for some use cases but I also think that the user could easily prepend it to the dataframe if he/she needs it and since the key is already there.

@felixcheung
Copy link
Member

I'm not too worry about the exact words, but prepend keys doesn't seem obvious what it means.
also please use something.something as parameter name and not camel casing - we should try to do that unless the name is in Spark Scala.

@NarineK
Copy link
Contributor Author

NarineK commented Jul 5, 2017

Alright, give me couple days to address those cases.

@NarineK
Copy link
Contributor Author

NarineK commented Jul 10, 2017

@gatorsmile, I'm able to access groupingExprs from SQLUtils.scala through val groupingExprs: Seq[Expression], however it seems to be challenging to access the name of the column from pure expression. In RelationalGroupedDataset it is using alias to create a named expression: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala#L83

What would you suggest as a best option of accessing column name from Expression ?

Thank you,
Narine

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 18, 2020
@github-actions github-actions bot closed this Jan 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants