@@ -21,14 +21,11 @@ import java.io.{File, OutputStream, PrintStream}
2121
2222import scala .collection .mutable .ArrayBuffer
2323
24- import org .scalatest .FunSuite
25- import org .scalatest .matchers .ShouldMatchers
26-
24+ import org .apache .spark .{SparkConf , SparkContext , SparkEnv , SparkException , TestUtils }
2725import org .apache .spark .deploy .SparkSubmit ._
28- import org .scalatest .prop .Tables .Table
29- import org .scalatest .prop .TableDrivenPropertyChecks ._
3026import org .apache .spark .util .Utils
31-
27+ import org .scalatest .FunSuite
28+ import org .scalatest .matchers .ShouldMatchers
3229
3330class SparkSubmitSuite extends FunSuite with ShouldMatchers {
3431
@@ -45,7 +42,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
4542 }
4643
4744 /** Returns true if the script exits and the given search string is printed. */
48- def testPrematureExit (input : Array [String ], searchString : String ): Boolean = {
45+ def testPrematureExit (input : Array [String ], searchString : String ) = {
4946 val printStream = new BufferPrintStream ()
5047 SparkSubmit .printStream = printStream
5148
@@ -63,35 +60,38 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
6360 }
6461 thread.start()
6562 thread.join()
66- printStream.lineBuffer.find(s => s.contains(searchString)).size > 0
63+ val joined = printStream.lineBuffer.mkString(" \n " )
64+ if (! joined.contains(searchString)) {
65+ fail(s " Search string ' $searchString' not found in $joined" )
66+ }
6767 }
6868
6969 test(" prints usage on empty input" ) {
70- testPrematureExit(Array [String ](), " Usage: spark-submit" ) should be ( true )
70+ testPrematureExit(Array [String ](), " Usage: spark-submit" )
7171 }
7272
7373 test(" prints usage with only --help" ) {
74- testPrematureExit(Array (" --help" ), " Usage: spark-submit" ) should be ( true )
74+ testPrematureExit(Array (" --help" ), " Usage: spark-submit" )
7575 }
7676
7777 test(" prints error with unrecognized option" ) {
78- testPrematureExit(Array (" my.jar --blarg" ), " Unrecognized option '--blarg'" ) should be ( true )
79- testPrematureExit(Array (" my.jar -bleg" ), " Unrecognized option: '-bleg'" ) should be ( true )
80- testPrematureExit(Array (" my.jar --master=abc" ),
81- " Unrecognized option: '--master=abc'. Perhaps you want '--master abc'?" ) should be ( true )
78+ testPrematureExit(Array (" --blarg" ), " Unrecognized option '--blarg'" )
79+ testPrematureExit(Array (" -bleg" ), " Unrecognized option '-bleg'" )
80+ testPrematureExit(Array (" --master=abc" ),
81+ " Unrecognized option '--master=abc'. Perhaps you want '--master abc'?" )
8282 }
8383
8484 test(" handles multiple binary definitions" ) {
8585 val adjacentJars = Array (" foo.jar" , " bar.jar" )
86- testPrematureExit(adjacentJars, " error: Found two conflicting resources" ) should be ( true )
86+ testPrematureExit(adjacentJars, " error: Found two conflicting resources" )
8787
8888 val nonAdjacentJars =
8989 Array (" foo.jar" , " --master" , " 123" , " --class" , " abc" , " bar.jar" )
90- testPrematureExit(nonAdjacentJars, " error: Found two conflicting resources" ) should be ( true )
90+ testPrematureExit(nonAdjacentJars, " error: Found two conflicting resources" )
9191 }
9292
9393 test(" handle binary specified but not class" ) {
94- testPrematureExit(Array (" foo.jar" ), " must specify a main class" )
94+ testPrematureExit(Array (" foo.jar" ), " Must specify a main class" )
9595 }
9696
9797 test(" handles YARN cluster mode" ) {
@@ -150,12 +150,11 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
150150 val appArgs = new SparkSubmitArguments (clArgs)
151151 val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
152152 val childArgsStr = childArgs.mkString(" " )
153- print(" child args: " + childArgsStr)
154153 childArgsStr.startsWith(" --memory 4g --cores 5 --supervise" ) should be (true )
155154 childArgsStr should include (" launch spark://h:p thejar.jar org.SomeClass arg1 arg2" )
156155 mainClass should be (" org.apache.spark.deploy.Client" )
157156 classpath should have length (0 )
158- sysProps should have size (0 )
157+ sysProps should have size (1 ) // contains --jar entry
159158 }
160159
161160 test(" handles standalone client mode" ) {
@@ -186,13 +185,79 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
186185 sysProps(" spark.cores.max" ) should be (" 5" )
187186 }
188187
188+ test(" launch simple application with spark-submit" ) {
189+ runSparkSubmit(
190+ Seq (" unUsed.jar" ,
191+ " --class" , SimpleApplicationTest .getClass.getName.stripSuffix(" $" ),
192+ " --name" , " testApp" ,
193+ " --master" , " local" ))
194+ }
195+
196+ test(" spark submit includes jars passed in through --jar" ) {
197+ val jar1 = TestUtils .createJarWithClasses(Seq (" SparkSubmitClassA" ))
198+ val jar2 = TestUtils .createJarWithClasses(Seq (" SparkSubmitClassB" ))
199+ val jarsString = Seq (jar1, jar2).map(j => j.toString).mkString(" ," )
200+ runSparkSubmit(
201+ Seq (" unUsed.jar" ,
202+ " --class" , JarCreationTest .getClass.getName.stripSuffix(" $" ),
203+ " --name" , " testApp" ,
204+ " --master" , " local-cluster[2,1,512]" ,
205+ " --jars" , jarsString))
206+ }
207+
208+ // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
189209 def runSparkSubmit (args : Seq [String ]): String = {
190210 val sparkHome = sys.env.get(" SPARK_HOME" ).orElse(sys.props.get(" spark.home" )).get
191211 Utils .executeAndGetOutput(
192212 Seq (" ./bin/spark-submit" ) ++ args,
193213 new File (sparkHome),
194214 Map (" SPARK_TESTING" -> " 1" , " SPARK_HOME" -> sparkHome))
195215 }
216+ }
217+
218+ object JarCreationTest {
219+ def main (args : Array [String ]) {
220+ val conf = new SparkConf ()
221+ val sc = new SparkContext (conf)
222+ val result = sc.makeRDD(1 to 100 , 10 ).mapPartitions{ x =>
223+ var foundClasses = false
224+ try {
225+ Class .forName(" SparkSubmitClassA" , true , Thread .currentThread().getContextClassLoader)
226+ Class .forName(" SparkSubmitClassA" , true , Thread .currentThread().getContextClassLoader)
227+ foundClasses = true
228+ } catch {
229+ case _ : Throwable => // catch all
230+ }
231+ Seq (foundClasses).iterator
232+ }.collect()
233+ if (result.contains(false )) {
234+ throw new Exception (" Could not load user defined classes inside of executors" )
235+ }
236+ }
237+ }
238+
239+ object SimpleApplicationTest {
240+ def main (args : Array [String ]) {
241+ val conf = new SparkConf ()
242+ val sc = new SparkContext (conf)
196243
244+ val configs = Seq (" spark.master" , " spark.app.name" )
245+ for (config <- configs) {
246+ val masterValue = conf.get(config)
247+ val executorValues = sc
248+ .makeRDD(1 to 100 , 10 )
249+ .map(x => SparkEnv .get.conf.get(config))
250+ .collect()
251+ .distinct
252+ if (executorValues.size != 1 ) {
253+ throw new SparkException (s " Inconsistent values for $config: $executorValues" )
254+ }
255+ val executorValue = executorValues(0 )
256+ if (executorValue != masterValue) {
257+ throw new SparkException (
258+ s " Master had $config= $masterValue but executor had $config= $executorValue" )
259+ }
260+ }
197261
262+ }
198263}
0 commit comments