@@ -19,7 +19,9 @@ package org.apache.spark.repl
1919
2020import java .io .BufferedReader
2121
22+ // scalastyle:off println
2223import scala .Predef .{println => _ , _ }
24+ // scalastyle:on println
2325import scala .tools .nsc .Settings
2426import scala .tools .nsc .interpreter .{ILoop , JPrintWriter }
2527import scala .tools .nsc .util .stringFromStream
@@ -33,39 +35,53 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
3335 def this (in0 : BufferedReader , out : JPrintWriter ) = this (Some (in0), out)
3436 def this () = this (None , new JPrintWriter (Console .out, true ))
3537
36- def initializeSpark () {
37- intp.beQuietDuring {
38- processLine("""
39- @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
40- org.apache.spark.repl.Main.sparkSession
41- } else {
42- org.apache.spark.repl.Main.createSparkSession()
43- }
44- @transient val sc = {
45- val _sc = spark.sparkContext
46- if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
47- val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
48- if (proxyUrl != null) {
49- println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
50- } else {
51- println(s"Spark Context Web UI is available at Spark Master Public URL")
52- }
53- } else {
54- _sc.uiWebUrl.foreach {
55- webUrl => println(s"Spark context Web UI available at ${webUrl}")
56- }
57- }
58- println("Spark context available as 'sc' " +
59- s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
60- println("Spark session available as 'spark'.")
61- _sc
38+ override def createInterpreter (): Unit = {
39+ intp = new SparkILoopInterpreter (settings, out, initializeSpark)
40+ }
41+
42+ val initializationCommands : Seq [String ] = Seq (
43+ """
44+ @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
45+ org.apache.spark.repl.Main.sparkSession
46+ } else {
47+ org.apache.spark.repl.Main.createSparkSession()
48+ }
49+ @transient val sc = {
50+ val _sc = spark.sparkContext
51+ if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
52+ val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
53+ if (proxyUrl != null) {
54+ println(
55+ s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
56+ } else {
57+ println(s"Spark Context Web UI is available at Spark Master Public URL")
6258 }
63- """ )
64- processLine(" import org.apache.spark.SparkContext._" )
65- processLine(" import spark.implicits._" )
66- processLine(" import spark.sql" )
67- processLine(" import org.apache.spark.sql.functions._" )
68- replayCommandStack = Nil // remove above commands from session history.
59+ } else {
60+ _sc.uiWebUrl.foreach {
61+ webUrl => println(s"Spark context Web UI available at ${webUrl}")
62+ }
63+ }
64+ println("Spark context available as 'sc' " +
65+ s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
66+ println("Spark session available as 'spark'.")
67+ _sc
68+ }
69+ """ ,
70+ " import org.apache.spark.SparkContext._" ,
71+ " import spark.implicits._" ,
72+ " import spark.sql" ,
73+ " import org.apache.spark.sql.functions._"
74+ )
75+
76+ def initializeSpark (): Unit = {
77+ if (! intp.reporter.hasErrors) {
78+ // `savingReplayStack` removes the commands from session history.
79+ savingReplayStack {
80+ initializationCommands.foreach(intp quietRun _)
81+ }
82+ } else {
83+ throw new RuntimeException (s " Scala $versionString interpreter encountered " +
84+ " errors during initialization" )
6985 }
7086 }
7187
@@ -89,21 +105,17 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
89105 /** Available commands */
90106 override def commands : List [LoopCommand ] = standardCommands
91107
92- /**
93- * We override `loadFiles` because we need to initialize Spark *before* the REPL
94- * sees any files, so that the Spark context is visible in those files. This is a bit of a
95- * hack, but there isn't another hook available to us at this point.
96- */
97- override def loadFiles (settings : Settings ): Unit = {
98- initializeSpark()
99- super .loadFiles(settings)
100- }
101-
102108 override def resetCommand (line : String ): Unit = {
103109 super .resetCommand(line)
104110 initializeSpark()
105111 echo(" Note that after :reset, state of SparkSession and SparkContext is unchanged." )
106112 }
113+
114+ override def replay (): Unit = {
115+ initializeSpark()
116+ super .replay()
117+ }
118+
107119}
108120
109121object SparkILoop {
0 commit comments