-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-6483][SQL]Improve ScalaUdf called performance. #5154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Hmm, have you try what the performance gain by this change? From my understanding the bottleneck is in the function call |
|
Before this change, it takes 17 minutes, and now takes 5 minutes, which is the same as HiveContext + udf floor and non-udf |
|
OK, probably we can also move the |
|
I mean we can do something like val f = children.size match {
case 1 =>
val func = function.asInstanceOf[(Any) => Any]
val child0 = children(0)
(input: Row) => {
func(ScalaReflection.convertToScala(child0.eval(input), child0.dataType)))
}
case 2 =>
val func = function.asInstanceOf[(Any) => Any]
val child0 = children(0)
val child1 = children(1)
(input: Row) => {
func(ScalaReflection.convertToScala(child0.eval(input), child0.dataType))
ScalaReflection.convertToScala(child1.eval(input), child1.dataType)))
}
}
def eval(input: Row) = f(input) |
|
OK, I will modify code and test again. |
|
@chenghao-intel , I change code and test it, the result is the same as last commit , is 5 minutes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
children is the type of Seq[Expression] (essentially the List[Expression]), access its element by index will cause performance overhead in runtime, we'd better move that out of the anonymous functions. See:
http://docs.scala-lang.org/overviews/collections/performance-characteristics.html
|
@zzcclp I will run the benchmark in my local machine, will get back soon. @liancheng , can you trigger the unit test? |
|
Verified the code change by the following micro-benchmark import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
case class Floor(child: Expression) extends UnaryExpression with Predicate {
override def foldable = child.foldable
def nullable = child.nullable
override def toString = s"Floor $child"
override def eval(input: Row): Any = {
child.eval(input) match {
case null => null
case ts: Int => ts - ts % 300
}
}
}
object T {
def benchmark(count: Int, expr: Expression): Unit = {
var i = 0
val row = new GenericRow(Array[Any](123, 21, 42))
val s = System.currentTimeMillis()
while (i < count) {
expr.eval(row)
i += 1
}
val e = System.currentTimeMillis()
println (s"${expr.getClass.getSimpleName} -- ${e - s} ms")
}
def main(args: Array[String]) {
def func(ts: Int) = ts - ts % 300
val udf0 = ScalaUdf(func _, IntegerType, BoundReference(0, IntegerType, true) :: Nil)
val udf1 = Floor(BoundReference(0, IntegerType, true))
benchmark(1000000, udf0)
benchmark(1000000, udf0)
benchmark(1000000, udf0)
benchmark(1000000, udf1)
benchmark(1000000, udf1)
benchmark(1000000, udf1)
}
}Without the code change it outputs Floor -- 49 ms With the code change, it outputs Floor -- 27 ms Conclusions:
We probably need to provide more efficient way of UDF extension interface. |
|
ok to test |
|
Test build #29093 has started for PR 5154 at commit
|
|
Test build #29093 has finished for PR 5154 at commit
|
|
Test PASSed. |
|
Test build #29134 has started for PR 5154 at commit
|
|
@chenghao-intel ,I have update the code. can you take a look again. thanks. |
|
You need to fetch the latest code and resolve the conflicts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: revert the change here?
|
@zzcclp LGTM, except some small issues. |
|
Test build #29134 has finished for PR 5154 at commit
|
|
Test FAILed. |
…n script. 1. access Seq[Expression] element by :: operator 2. update the code gen script
|
Test build #29141 has started for PR 5154 at commit
|
|
@SparkQA , merge again. |
|
Test FAILed. |
|
Test build #29142 has finished for PR 5154 at commit
|
|
Test FAILed. |
|
@zzcclp Probably you have to use the code like: val f = children.size match {
case 1 =>
val func = function.asInstanceOf[(Any) => Any]
val child0 = children(0)
(input: Row) => {
func(ScalaReflection.convertToScala(child0.eval(input), child0.dataType)))
}
case 2 =>
val func = function.asInstanceOf[(Any) => Any]
val child0 = children(0)
val child1 = children(1)
(input: Row) => {
func(ScalaReflection.convertToScala(child0.eval(input), child0.dataType))
ScalaReflection.convertToScala(child1.eval(input), child1.dataType)))
}
} |
|
@chenghao-intel , can you review again, thanks. |
|
Test build #29152 has started for PR 5154 at commit
|
|
@AmplabJenkins , please re test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A newline should be added at the end of source file.
|
Test build #29152 has finished for PR 5154 at commit
|
|
Test PASSed. |
|
Add a newline at the end of source file, does it need to re test? |
|
Test build #29157 has started for PR 5154 at commit
|
|
@AmplabJenkins , please re test, thanks. |
|
The unit test will be auto-triggered, once the code changed, you needn't say anything to @AmplabJenkins . |
|
OK, I am new sparker, 😄 |
|
Test build #29157 has finished for PR 5154 at commit
|
|
Test PASSed. |
|
@zzcclp @chenghao-intel Thanks for working on this and the review comments! Merged to master. |
|
@zzcclp Would you please set your real name on both GitHub and JIRA so that our script can put your name on the credit list of the next release? Also, it would be good if you can set your name in git config: |
|
@liancheng , I already set my real name on my GitHub Name and JIRA Full Name. 😃 |
It's a follow-up of #5154, we can speed up scala udf evaluation by create type converter in advance. Author: Wenchen Fan <[email protected]> Closes #6182 from cloud-fan/tmp and squashes the following commits: 241cfe9 [Wenchen Fan] use converter in ScalaUdf (cherry picked from commit 2f22424) Signed-off-by: Yin Huai <[email protected]>
It's a follow-up of #5154, we can speed up scala udf evaluation by create type converter in advance. Author: Wenchen Fan <[email protected]> Closes #6182 from cloud-fan/tmp and squashes the following commits: 241cfe9 [Wenchen Fan] use converter in ScalaUdf
It's a follow-up of apache#5154, we can speed up scala udf evaluation by create type converter in advance. Author: Wenchen Fan <[email protected]> Closes apache#6182 from cloud-fan/tmp and squashes the following commits: 241cfe9 [Wenchen Fan] use converter in ScalaUdf
It's a follow-up of apache#5154, we can speed up scala udf evaluation by create type converter in advance. Author: Wenchen Fan <[email protected]> Closes apache#6182 from cloud-fan/tmp and squashes the following commits: 241cfe9 [Wenchen Fan] use converter in ScalaUdf
It's a follow-up of apache#5154, we can speed up scala udf evaluation by create type converter in advance. Author: Wenchen Fan <[email protected]> Closes apache#6182 from cloud-fan/tmp and squashes the following commits: 241cfe9 [Wenchen Fan] use converter in ScalaUdf
As issue SPARK-6483 description, ScalaUdf is low performance because of calling asInstanceOf to convert per record.
With this, the performance of ScalaUdf is the same as other case.
thank @lianhuiwang for telling me how to resolve this problem.