Skip to content

Conversation

@mubarak
Copy link
Contributor

@mubarak mubarak commented Aug 1, 2014

Please review: [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI

Screenshot after fix:

screen shot 2014-07-18 at 7 31 54 am

Sample code used: Tutorial.scala

1   package org.apache.spark.examples.streaming
2
3   import org.apache.spark._
4   import org.apache.spark.SparkContext._
5   import org.apache.spark.streaming._
6   import org.apache.spark.streaming.twitter._
7   import org.apache.spark.streaming.StreamingContext._
8   import org.apache.spark.examples.streaming.TutorialHelper._
9   import org.apache.spark.util.{CallSite, Utils}
10
11 object Tutorial {
12  def main(args: Array[String]) {
13    
14    // Checkpoint directory
15    val checkpointDir = TutorialHelper.getCheckpointDirectory()
16
17   // Configure Twitter credentials
18    val apiKey = ""
19   val apiSecret = ""
20    val accessToken = ""
21    val accessTokenSecret = ""
22    TutorialHelper.configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
23
24    // Your code goes here
25    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("Twitter Example").setMaster("spark://ec2-54-241-226-42.us-west-1.compute.amazonaws.com:7077"))
26    val ssc = new StreamingContext(sc, Seconds(5))
27    val tweets = TwitterUtils.createStream(ssc, None)
28
29    val statuses = tweets.map(status => status.getText())
30    val words = statuses.flatMap(status => status.split(" "))
31
32    val hashtags = words.filter(word => word.startsWith("#"))
33    val counts = hashtags.map(tag => (tag, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(10))    
34
35    val sortedCounts = counts.map { case(tag, count) => (count, tag) }
36                         .transform(rdd => rdd.sortByKey(false))
37    sortedCounts.foreach(rdd =>
38      println("\nTop 10 hashtags:\n" + rdd.take(10).mkString("\n")))
39
40    ssc.checkpoint(checkpointDir)
41    ssc.start()
42    ssc.awaitTermination()
43
44   }
45 }

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To set the callSite of streaming driver code (e.g., StreamingContext at Tutorial.scala:26). I think same call needs to be added if isCheckpointPresent=true (line #112)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to set the callsite of streaming driver? Which Spark job requires that callsite?

@tdas
Copy link
Contributor

tdas commented Aug 2, 2014

This is a great start @mubarak thanks for navigating all the control flow graph and putting this together. Though I feel there is a better way of doing this that does not involve customizing every DStream. In particular, can explain what purpose does the "name" field in DStream serve?

Also if local property needs to be set in before creating RDDs in compute, then it can be called commonly for all DStreams at the DStream.getOrCompute function, right before compute function is called.

@tdas
Copy link
Contributor

tdas commented Aug 2, 2014

Also, it doesnt merge cleanly with master.

@mubarak
Copy link
Contributor Author

mubarak commented Aug 10, 2014

@tdas
I have removed 'name' from DStream and addressed your review comments. Can you please review? Thanks.

Conflicts:
	core/src/main/scala/org/apache/spark/rdd/RDD.scala
	streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@tdas
Copy link
Contributor

tdas commented Aug 18, 2014

Jenkins, this is ok to test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why were these new patterns added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was getting this stacktrace if i don't filter scala.* and one of the stage was apply at List.###

apply at List.scala:318 +details
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1283)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1283)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1283)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1283)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1092)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1107)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1121)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
org.apache.spark.rdd.RDD.collect(RDD.scala:774)
org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:262)
org.apache.spark.RangePartitioner.<init>(Partitioner.scala:124)
org.apache.spark.rdd.OrderedRDDFunctions.sortByKey(OrderedRDDFunctions.scala:63)
org.apache.spark.examples.streaming.Tutorial$$anonfun$8.apply(Tutorial.scala:36)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, I guessed so. But this does change how callsites will get generated for general Spark programs as well. Will need some more input from others regarding this. @andrewor14 Can you take a look at this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, second thought. I am wondering that should all the spark classes be ignored directly? That is all classes org.apache.spark.*? Why include subpackages in the regular expressions? Are there some internal Spark classes that we want to show up in the callsite shortform?
@andrewor14 What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think example programs from org.apache.spark.examples shouldn't be ignored

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, then the condition should be converted to
if ((isSparkClass && ! isSparkExampleClass) || isScalaClass) { ... }

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have started for PR 1723 at commit ccde038.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have finished for PR 1723 at commit ccde038.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary.

@tdas
Copy link
Contributor

tdas commented Aug 18, 2014

This looks pretty promising!! However why do every DStream need to change? The DStream parent class can do the following. Before calling compute(), it can store the previous callsite, and set the new callsite, and then call compute. Upon returning from compute(), it can reset the previous callsite. As far as I can see, that should work for all the DStreams without modifying every single stream.

Also can you show us a new screenshot of what it looks like now. Does it look the same as the on posted in this PR?

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have started for PR 1723 at commit a207eb7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have finished for PR 1723 at commit a207eb7.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Aug 18, 2014

@mubarak This is what I had in mind. I have a few more comments in the code. Can you please comment/address those? Also, can you post a new screenshot?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dealing with this separately in two code paths is prone to errors in the future. Can you make the compute(time) match { ... } return into a variable and use that later. Something like this.

val rddOption = compute(time) match {
    // no setCallSite inside this
}
setCallSite(prevCallSite)   // only one instance of setCallSite, common to both code paths.
return rddOption

And while on that, can you also add return at other code paths in this function. That is, the None in lines 332 and 335 in this commit?

…prevCallSite) only once. Adding return for other code paths (for None)
@mubarak
Copy link
Contributor Author

mubarak commented Aug 18, 2014

New screenshot
screen shot 2014-08-17 at 8 02 21 pm

@tdas
Copy link
Contributor

tdas commented Sep 5, 2014

Jenkins, test this please.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mubarak Sorry if I wasnt clear in my previous comment, but I wanted the streaming regex to be added to spark.streaming.util.Utils (new file needed). Logically, Spark code should not have any reference to higher level stuff as Spark Streaming.

@tdas
Copy link
Contributor

tdas commented Sep 5, 2014

Jenkins is currently having issues so cannot test it. But this looks pretty good except comment on the streaming regex - it should be in streaming/util/Utils.scala not in spark core. Other than that, I have asked @andrewor14 to take a look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just Spark class right? No need to add STREAMING here.

@andrewor14
Copy link
Contributor

This sets a flag in SparkContext to reflect whether we want streaming call sites vs normal call sites. Doesn't this mean if we use this same SparkContext for other things, we will get the streaming call sites instead? I understand this is through the thread local properties, but this still has strange semantics. Because I am using this SparkContext for streaming, its call sites are suddenly different. Am I understanding this correctly?

@tdas
Copy link
Contributor

tdas commented Sep 5, 2014

No no, we want to preserve the behavior for Spark, and only change it for streaming. So the way this is constructed is that the RDD that are generated through Spark Streaming DStreams will have the custom callsite generate from DStream callsite. For all other RDDs generated not through DStream, the behavior wont be affected.

@SparkQA
Copy link

SparkQA commented Sep 5, 2014

Can one of the admins verify this patch?

@andrewor14
Copy link
Contributor

ok to test

@tdas
Copy link
Contributor

tdas commented Sep 6, 2014

Jenkins, this is ok to test.

@SparkQA
Copy link

SparkQA commented Sep 6, 2014

QA tests have started for PR 1723 at commit ceb43da.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 6, 2014

QA tests have finished for PR 1723 at commit ceb43da.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Sep 6, 2014

Hey @mubarak, @andrewor14 and I discussed this PR, and it would be cool use this opportunity refactor a the callSite stuff a bit, and document this callsite stuff (which is very hard to understand, I think you would agree). So is it cool if I take your branch and add some documentation and do a bit of refactoring that makes this clearer? Your commits will be there and yuo will be one of the authors, so nothing to worry.

@mubarak
Copy link
Contributor Author

mubarak commented Sep 6, 2014

@tdas
+1. Can you please review? Thanks

@mubarak
Copy link
Contributor Author

mubarak commented Sep 6, 2014

Jenkins, this is ok to test.

@tdas
Copy link
Contributor

tdas commented Sep 6, 2014

@mubarak I commented earlier about me making some small refactoring. Do you mind? See earlier comment.

@mubarak
Copy link
Contributor Author

mubarak commented Sep 6, 2014

@tdas I don't mind. Please.

@tdas
Copy link
Contributor

tdas commented Sep 6, 2014

Thanks @mubarak!!!! I will create a new PR and ask you to take a look.

@tdas
Copy link
Contributor

tdas commented Sep 19, 2014

@mubarak I have opened a new PR with my changes. Could you please take a look at #2464

And can you close this PR?

@tdas
Copy link
Contributor

tdas commented Sep 23, 2014

@mubarak mind closing this?

asfgit pushed a commit that referenced this pull request Sep 23, 2014
…er) in Spark Stages UI

This is a refactored version of the original PR #1723 my mubarak

Please take a look andrewor14, mubarak

Author: Mubarak Seyed <[email protected]>
Author: Tathagata Das <[email protected]>

Closes #2464 from tdas/streaming-callsite and squashes the following commits:

dc54c71 [Tathagata Das] Made changes based on PR comments.
390b45d [Tathagata Das] Fixed minor bugs.
904cd92 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-callsite
7baa427 [Tathagata Das] Refactored getCallSite and setCallSite to make it simpler. Also added unit test for DStream creation site.
b9ed945 [Mubarak Seyed] Adding streaming utils
c461cf4 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
ceb43da [Mubarak Seyed] Changing default regex function name
8c5d443 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
196121b [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
491a1eb [Mubarak Seyed] Removing streaming visibility from getRDDCreationCallSite in DStream
33a7295 [Mubarak Seyed] Fixing review comments: Merging both setCallSite methods
c26d933 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
f51fd9f [Mubarak Seyed] Fixing scalastyle, Regex for Utils.getCallSite, and changing method names in DStream
5051c58 [Mubarak Seyed] Getting return value of compute() into variable and call setCallSite(prevCallSite) only once. Adding return for other code paths (for None)
a207eb7 [Mubarak Seyed] Fixing code review comments
ccde038 [Mubarak Seyed] Removing Utils import from MappedDStream
2a09ad6 [Mubarak Seyed] Changes in Utils.scala for SPARK-1853
1d90cc3 [Mubarak Seyed] Changes for SPARK-1853
5f3105a [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
70f494f [Mubarak Seyed] Changes for SPARK-1853
1500deb [Mubarak Seyed] Changes in Spark Streaming UI
9d38d3c [Mubarak Seyed] [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI
d466d75 [Mubarak Seyed] Changes for spark streaming UI

(cherry picked from commit 729952a)
Signed-off-by: Andrew Or <[email protected]>
@JoshRosen
Copy link
Contributor

Hi @mubarak,

This issue has been fixed by #2464, so do you mind closing this? Thanks!

(Due to the way that this GitHub mirror is set up, we don't have permission to close your PR).

@mubarak
Copy link
Contributor Author

mubarak commented Oct 2, 2014

Fixed using #2464

@mubarak mubarak closed this Oct 2, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants