-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI #1723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To set the callSite of streaming driver code (e.g., StreamingContext at Tutorial.scala:26). I think same call needs to be added if isCheckpointPresent=true (line #112)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to set the callsite of streaming driver? Which Spark job requires that callsite?
|
This is a great start @mubarak thanks for navigating all the control flow graph and putting this together. Though I feel there is a better way of doing this that does not involve customizing every DStream. In particular, can explain what purpose does the "name" field in DStream serve? Also if local property needs to be set in before creating RDDs in compute, then it can be called commonly for all DStreams at the DStream.getOrCompute function, right before compute function is called. |
|
Also, it doesnt merge cleanly with master. |
|
@tdas |
Conflicts: core/src/main/scala/org/apache/spark/rdd/RDD.scala streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
|
Jenkins, this is ok to test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why were these new patterns added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was getting this stacktrace if i don't filter scala.* and one of the stage was apply at List.###
apply at List.scala:318 +details
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1283)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1283)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1283)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1092)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1107)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1121)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
org.apache.spark.rdd.RDD.collect(RDD.scala:774)
org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:262)
org.apache.spark.RangePartitioner.<init>(Partitioner.scala:124)
org.apache.spark.rdd.OrderedRDDFunctions.sortByKey(OrderedRDDFunctions.scala:63)
org.apache.spark.examples.streaming.Tutorial$$anonfun$8.apply(Tutorial.scala:36)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah, I guessed so. But this does change how callsites will get generated for general Spark programs as well. Will need some more input from others regarding this. @andrewor14 Can you take a look at this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, second thought. I am wondering that should all the spark classes be ignored directly? That is all classes org.apache.spark.*? Why include subpackages in the regular expressions? Are there some internal Spark classes that we want to show up in the callsite shortform?
@andrewor14 What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think example programs from org.apache.spark.examples shouldn't be ignored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, then the condition should be converted to
if ((isSparkClass && ! isSparkExampleClass) || isScalaClass) { ... }
|
QA tests have started for PR 1723 at commit
|
|
QA tests have finished for PR 1723 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unnecessary.
|
This looks pretty promising!! However why do every DStream need to change? The DStream parent class can do the following. Before calling compute(), it can store the previous callsite, and set the new callsite, and then call compute. Upon returning from compute(), it can reset the previous callsite. As far as I can see, that should work for all the DStreams without modifying every single stream. Also can you show us a new screenshot of what it looks like now. Does it look the same as the on posted in this PR? |
|
QA tests have started for PR 1723 at commit
|
|
QA tests have finished for PR 1723 at commit
|
|
@mubarak This is what I had in mind. I have a few more comments in the code. Can you please comment/address those? Also, can you post a new screenshot? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dealing with this separately in two code paths is prone to errors in the future. Can you make the compute(time) match { ... } return into a variable and use that later. Something like this.
val rddOption = compute(time) match {
// no setCallSite inside this
}
setCallSite(prevCallSite) // only one instance of setCallSite, common to both code paths.
return rddOption
And while on that, can you also add return at other code paths in this function. That is, the None in lines 332 and 335 in this commit?
…prevCallSite) only once. Adding return for other code paths (for None)
|
Jenkins, test this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mubarak Sorry if I wasnt clear in my previous comment, but I wanted the streaming regex to be added to spark.streaming.util.Utils (new file needed). Logically, Spark code should not have any reference to higher level stuff as Spark Streaming.
|
Jenkins is currently having issues so cannot test it. But this looks pretty good except comment on the streaming regex - it should be in streaming/util/Utils.scala not in spark core. Other than that, I have asked @andrewor14 to take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just Spark class right? No need to add STREAMING here.
|
This sets a flag in |
|
No no, we want to preserve the behavior for Spark, and only change it for streaming. So the way this is constructed is that the RDD that are generated through Spark Streaming DStreams will have the custom callsite generate from DStream callsite. For all other RDDs generated not through DStream, the behavior wont be affected. |
|
Can one of the admins verify this patch? |
|
ok to test |
|
Jenkins, this is ok to test. |
|
QA tests have started for PR 1723 at commit
|
|
QA tests have finished for PR 1723 at commit
|
|
Hey @mubarak, @andrewor14 and I discussed this PR, and it would be cool use this opportunity refactor a the callSite stuff a bit, and document this callsite stuff (which is very hard to understand, I think you would agree). So is it cool if I take your branch and add some documentation and do a bit of refactoring that makes this clearer? Your commits will be there and yuo will be one of the authors, so nothing to worry. |
|
@tdas |
|
Jenkins, this is ok to test. |
|
@mubarak I commented earlier about me making some small refactoring. Do you mind? See earlier comment. |
|
@tdas I don't mind. Please. |
|
Thanks @mubarak!!!! I will create a new PR and ask you to take a look. |
|
@mubarak mind closing this? |
…er) in Spark Stages UI This is a refactored version of the original PR #1723 my mubarak Please take a look andrewor14, mubarak Author: Mubarak Seyed <[email protected]> Author: Tathagata Das <[email protected]> Closes #2464 from tdas/streaming-callsite and squashes the following commits: dc54c71 [Tathagata Das] Made changes based on PR comments. 390b45d [Tathagata Das] Fixed minor bugs. 904cd92 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-callsite 7baa427 [Tathagata Das] Refactored getCallSite and setCallSite to make it simpler. Also added unit test for DStream creation site. b9ed945 [Mubarak Seyed] Adding streaming utils c461cf4 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' ceb43da [Mubarak Seyed] Changing default regex function name 8c5d443 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 196121b [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 491a1eb [Mubarak Seyed] Removing streaming visibility from getRDDCreationCallSite in DStream 33a7295 [Mubarak Seyed] Fixing review comments: Merging both setCallSite methods c26d933 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' f51fd9f [Mubarak Seyed] Fixing scalastyle, Regex for Utils.getCallSite, and changing method names in DStream 5051c58 [Mubarak Seyed] Getting return value of compute() into variable and call setCallSite(prevCallSite) only once. Adding return for other code paths (for None) a207eb7 [Mubarak Seyed] Fixing code review comments ccde038 [Mubarak Seyed] Removing Utils import from MappedDStream 2a09ad6 [Mubarak Seyed] Changes in Utils.scala for SPARK-1853 1d90cc3 [Mubarak Seyed] Changes for SPARK-1853 5f3105a [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 70f494f [Mubarak Seyed] Changes for SPARK-1853 1500deb [Mubarak Seyed] Changes in Spark Streaming UI 9d38d3c [Mubarak Seyed] [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI d466d75 [Mubarak Seyed] Changes for spark streaming UI (cherry picked from commit 729952a) Signed-off-by: Andrew Or <[email protected]>
|
Fixed using #2464 |

Please review: [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI
Screenshot after fix:
Sample code used: Tutorial.scala