-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-13721][SQL] Support outer generators in DataFrame API #16608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bogdanrdc this looks promising! I left some small comments.
| } | ||
|
|
||
| case class GeneratorOuter(child: Generator) extends UnaryExpression | ||
| with Generator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the Unevaluable trait. Then you don't have to implement eval() and doGenCode()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't make it work with Unevaluable because Generator.eval is not the same as Unevaluable.eval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that is fair.
| expression[NullIf]("nullif"), | ||
| expression[Nvl]("nvl"), | ||
| expression[Nvl2]("nvl2"), | ||
| expression[OuterExplode]("outer_explode"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name these ..._outer, e.g.: explode_outer. That makes them easier to find when someone is looking for them (same goes for naming in functions.scala).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| new StructType() | ||
| .add("pos", IntegerType, nullable = false) | ||
| .add("key", kt, nullable = false) | ||
| .add("key", kt, nullable = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when this is false?
I think we should fix the output of Generate instead of doing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will see if I can fix it from Generate instead.
If it's false then outer_explode with empty map fails with this exception:
java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
createexternalrow(input[0, string, false].toString, input[1, string, true].toString, StructField(key,StringType,false), StructField(value,StringType,true))
:- input[0, string, false].toString
: +- input[0, string, false]
+- input[1, string, true].toString
+- input[1, string, true]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it in Generate instead. makes more sense from there too.
| override val position: Boolean = false | ||
| } | ||
|
|
||
| class OuterExplode(child: Expression) extends GeneratorOuter(Explode(child)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this. Favor composition over inheritance. Just create a GeneratorOuter(Explode(child)) whenever you need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it like this so I can use it in FunctionRegistry.scala:
expression[OuterExplode]("outer_explode")
Is there a way I can still make it work with GeneratorOuter(Explode(child))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted to GeneratorOuter(Explode()) and created a new helper method in FunctionRegistry instead.
Actually the way I made these classes wasn't working. It was returning strange results, but not failing otherwise. I guess somewhere the OuterExplode class was not caught by a match. The generated code was different.
| override val position = true | ||
| } | ||
|
|
||
| class OuterPosExplode(child: Expression) extends GeneratorOuter(PosExplode(child)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this.
| } | ||
| } | ||
|
|
||
| class OuterInline(child: Expression) extends GeneratorOuter(Inline(child)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this.
| // array/map contains no input. | ||
| // generateOuter is an int. it is set to 1 iff outer is true and the input is empty or null. | ||
| val generateOuter = ctx.freshName("generateOuter") | ||
| val isOuter = if (outer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we generate the expressions for the generateOuter variable here instead? It is always 0 when outer == false and $numElements == 0 ? 1 : 0 when it is true. This increases the probability that the generated code gets eliminated at compile time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simplified it by using index == -1 like you said and reverting to the old code. It was also easy to make it produce (null, null) for posexplode_outer instead of (-1, null). @rxin preferred to return null and not -1 (or even 0) for the position. So now, everything is null always.
| * | ||
| * @group collection_funcs | ||
| * @since 2.2.0 | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming: explode_outer
| * | ||
| * @group collection_funcs | ||
| * @since 2.2.0 | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming: posexplode_outer
| expression[Nvl]("nvl"), | ||
| expression[Nvl2]("nvl2"), | ||
| expression[OuterExplode]("outer_explode"), | ||
| expression[OuterInline]("outer_inline"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this one to the function registry? It is not exposed in functions.scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to support df.selectExpr("outer_explode(array())").
Should I remove it?
|
Test build #71458 has finished for PR 16608 at commit
|
| df.select(explode('intList)), | ||
| Row(1) :: Row(2) :: Row(3) :: Nil) | ||
| } | ||
| test("single outer_explode") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a blank line before this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just add it to all the test cases - have a blank line (only one) separating them.
| def unapply(e: Expression): Option[(Generator, Seq[String])] = e match { | ||
| case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil)) | ||
| case MultiAlias(g: Generator, names) if g.resolved => Some(g, names) | ||
| def unapply(e: Expression): Option[(Generator, Seq[String], Boolean)] = e match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document what the return value means (especially that boolean value, but also the Seq[String] that's preexisting)
| checkSqlGeneration("SELECT map(1, 'a', 2, 'b')") | ||
| checkSqlGeneration("SELECT named_struct('c1',1,'c2',2,'c3',3)") | ||
| checkSqlGeneration("SELECT nanvl(a, 5), nanvl(b, 10), nanvl(d, c) from t2") | ||
| checkSqlGeneration("SELECT outer_explode(array())") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few small comments. LGTM pending these & jenkins.
| } | ||
|
|
||
| case class GeneratorOuter(child: Generator) extends UnaryExpression | ||
| with Generator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that is fair.
| // prepend the new qualifier to the existed one | ||
| generatorOutput.map(a => a.withQualifier(Some(q))) | ||
| }.getOrElse(generatorOutput) | ||
| }.getOrElse(generatorOutput).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit style. Can you split this up for readability.
| def posexplode(e: Column): Column = withExpr { PosExplode(e.expr) } | ||
|
|
||
| /** | ||
| * Creates a new row for each element with position in the given array or map column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update this the result.
| new ExpressionInfo(clazz.getCanonicalName, name) | ||
| } | ||
| } | ||
| private def expressionGeneratorOuter[T <: Generator : ClassTag] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a new line
| } | ||
| private def expressionGeneratorOuter[T <: Generator : ClassTag] | ||
| (name: String): (String, (ExpressionInfo, FunctionBuilder)) = { | ||
| val regularGen = expression[T](name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deconstruct the tuple here: val (_, (info, generator)) = expression[T](name)
| val outerBuilder = (args: Seq[Expression]) => { | ||
| GeneratorOuter(regularGen._2._2(args).asInstanceOf[Generator]) | ||
| } | ||
| (name, (expressionInfo[GeneratorOuter](name), outerBuilder)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the generators info instead of the outer generator info.
|
Test build #71466 has finished for PR 16608 at commit
|
|
Test build #71467 has finished for PR 16608 at commit
|
|
Test build #71513 has finished for PR 16608 at commit
|
| } | ||
|
|
||
| private def expressionGeneratorOuter[T <: Generator : ClassTag] | ||
| (name: String): (String, (ExpressionInfo, FunctionBuilder)) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could you follow the indentation style? https://github.com/databricks/scala-style-guide#spacing-and-indentation
private def expressionGeneratorOuter[T <: Generator : ClassTag](
name: String): (String, (ExpressionInfo, FunctionBuilder)) = {
val (_, (info, generatorBuilder)) = expression[T](name)
...|
Test build #71529 has finished for PR 16608 at commit
|
|
LGTM - merging to master. |
## What changes were proposed in this pull request? Added outer_explode, outer_posexplode, outer_inline functions and expressions. Some bug fixing in GenerateExec.scala for CollectionGenerator. Previously it was not correctly handling the case of outer with empty collections, only with nulls. ## How was this patch tested? New tests added to GeneratorFunctionSuite Author: Bogdan Raducanu <[email protected]> Closes apache#16608 from bogdanrdc/SPARK-13721.
## What changes were proposed in this pull request? Added outer_explode, outer_posexplode, outer_inline functions and expressions. Some bug fixing in GenerateExec.scala for CollectionGenerator. Previously it was not correctly handling the case of outer with empty collections, only with nulls. ## How was this patch tested? New tests added to GeneratorFunctionSuite Author: Bogdan Raducanu <[email protected]> Closes apache#16608 from bogdanrdc/SPARK-13721.
| expression[Abs]("abs"), | ||
| expression[Coalesce]("coalesce"), | ||
| expression[Explode]("explode"), | ||
| expressionGeneratorOuter[Explode]("explode_outer"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need an update on ExpressionDescription for these three new expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile it uses the expression description of the underlying expression: https://github.com/apache/spark/pull/16608/files#diff-2c0350957ac4932d3f63796eceaeae08R517
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark-sql> desc function extended explode_outer;
Function: explode_outer
Class: org.apache.spark.sql.catalyst.expressions.Explode
Usage: explode_outer(expr) - Separates the elements of array `expr` into multiple rows, or the elements of map `expr` into multiple rows and columns.
Extended Usage:
Examples:
> SELECT explode_outer(array(10, 20));
10
20
@hvanhovell explode_outer is sharing the same expression description. Do we need an update on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we need an update? What is the extra information you want to convey? Do you want to add a generic line saying that an outer generator might produce nulls instead of filtering out the row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Update the description and provide an example to users? Maybe hardcode the function name instead of using _FUNC_?
@ExpressionDescription(
usage = "_FUNC_(expr) - Separates the elements of array `expr` into multiple rows, or the elements of map `expr` into multiple rows and columns.",
extended = """
Examples:
> SELECT _FUNC_(array(10, 20));
10
20
""")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not super enthusiastic about this. We have three options here:
- Leave as it is.
- Remove the
outer_...generators, and make a user use thelateral view outer ...instead. - Create separate OuterGenerator classes for each one, and provide proper documentation.
I am fine with any.
## What changes were proposed in this pull request? Added outer_explode, outer_posexplode, outer_inline functions and expressions. Some bug fixing in GenerateExec.scala for CollectionGenerator. Previously it was not correctly handling the case of outer with empty collections, only with nulls. ## How was this patch tested? New tests added to GeneratorFunctionSuite Author: Bogdan Raducanu <[email protected]> Closes apache#16608 from bogdanrdc/SPARK-13721.
What changes were proposed in this pull request?
Added outer_explode, outer_posexplode, outer_inline functions and expressions.
Some bug fixing in GenerateExec.scala for CollectionGenerator. Previously it was not correctly handling the case of outer with empty collections, only with nulls.
How was this patch tested?
New tests added to GeneratorFunctionSuite