@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
4141 TextInputFormat }
4242import org .apache .hadoop .mapreduce .{InputFormat => NewInputFormat , Job => NewHadoopJob }
4343import org .apache .hadoop .mapreduce .lib .input .{FileInputFormat => NewFileInputFormat }
44+ import org .apache .hadoop .yarn .conf .YarnConfiguration
4445
4546import org .apache .mesos .MesosNativeLibrary
4647
@@ -56,8 +57,7 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5657import org .apache .spark .rdd ._
5758import org .apache .spark .rpc .{RpcAddress , RpcEndpointRef }
5859import org .apache .spark .scheduler ._
59- import org .apache .spark .scheduler .cluster .{CoarseGrainedSchedulerBackend ,
60- SparkDeploySchedulerBackend , SimrSchedulerBackend }
60+ import org .apache .spark .scheduler .cluster .{ExecutorInfo , CoarseGrainedSchedulerBackend , SparkDeploySchedulerBackend , SimrSchedulerBackend }
6161import org .apache .spark .scheduler .cluster .mesos .{CoarseMesosSchedulerBackend , MesosSchedulerBackend }
6262import org .apache .spark .scheduler .local .LocalBackend
6363import org .apache .spark .storage ._
@@ -225,6 +225,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
225225 private var _jars : Seq [String ] = _
226226 private var _files : Seq [String ] = _
227227 private var _shutdownHookRef : AnyRef = _
228+ private var _logUrls : Option [Predef .Map [String , String ]] = None
228229
229230 /* ------------------------------------------------------------------------------------- *
230231 | Accessors and public fields. These provide access to the internal state of the |
@@ -314,6 +315,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
314315 private [spark] def dagScheduler_= (ds : DAGScheduler ): Unit = {
315316 _dagScheduler = ds
316317 }
318+ private [spark] def logUrls : Option [Predef .Map [String , String ]] = _logUrls
319+ private [spark] def logUrls_= (logUrlsMap : Option [Predef .Map [String , String ]]): Unit = {
320+ _logUrls = logUrlsMap
321+ logInfo(s " Setting log urls to ${_logUrls.get.mkString(" | " )}" )
322+ }
317323
318324 def applicationId : String = _applicationId
319325 def applicationAttemptId : Option [String ] = _applicationAttemptId
@@ -1912,6 +1918,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
19121918 // the cluster manager to get an application ID (in case the cluster manager provides one).
19131919 listenerBus.post(SparkListenerApplicationStart (appName, Some (applicationId),
19141920 startTime, sparkUser, applicationAttemptId))
1921+ _logUrls.foreach { logUrlsMap =>
1922+ listenerBus.post(SparkListenerExecutorAdded (System .currentTimeMillis(), SparkContext
1923+ .DRIVER_IDENTIFIER , new ExecutorInfo (Utils .localHostName(), 0 , logUrlsMap)))
1924+ }
19151925 }
19161926
19171927 /** Post the application end event */
@@ -2422,6 +2432,21 @@ object SparkContext extends Logging {
24222432 }
24232433 }
24242434 scheduler.initialize(backend)
2435+ val logUrl = System .getProperty(" spark.yarn.driver.log.url" )
2436+ if (logUrl != null ) {
2437+ // lookup appropriate http scheme for container log urls
2438+ val yarnConf : YarnConfiguration = new YarnConfiguration (sc.hadoopConfiguration)
2439+ val yarnHttpPolicy = yarnConf.get(
2440+ YarnConfiguration .YARN_HTTP_POLICY_KEY ,
2441+ YarnConfiguration .YARN_HTTP_POLICY_DEFAULT
2442+ )
2443+ val httpScheme = if (yarnHttpPolicy == " HTTPS_ONLY" ) " https://" else " http://"
2444+ val baseUrl = s " $httpScheme$logUrl"
2445+ sc.logUrls =
2446+ Some (Predef .Map (
2447+ " stderr" -> s " $baseUrl/stderr?start=0 " ,
2448+ " stdout" -> s " $baseUrl/stdout?start=0 " ))
2449+ }
24252450 (backend, scheduler)
24262451
24272452 case " yarn-client" =>
0 commit comments