Skip to content

Commit cbd225b

Browse files
committed
Put spark-submit underneath the hood of the spark-pipelines CLI (apache#126)
Updates our `spark-pipelines` CLI script to use `spark-submit`, as recommended by @martin-grund_data. This will enable running `spark-pipelines` against common cluster managers. A nice consequence of this change is that users will be able to avoid launching a separate Spark Connect server and pointing to it via the `--remote` arg. We follow the default behavior of `spark-submit` with Spark Connect, where Spark handles launching the server for the user and tearing it down at the end of the process. So `spark-pipelines run` will just work if there's a pipeline spec in the directory.
1 parent b44e243 commit cbd225b

File tree

4 files changed

+250
-8
lines changed

4 files changed

+250
-8
lines changed

bin/spark-pipelines

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,4 @@ fi
3030
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
3131
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH"
3232

33-
$PYSPARK_PYTHON "${SPARK_HOME}"/python/pyspark/pipelines/cli.py "$@"
33+
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkPipelines "$@"
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import java.util
21+
22+
import scala.collection.mutable.ArrayBuffer
23+
import scala.jdk.CollectionConverters._
24+
25+
import org.apache.spark.SparkUserAppException
26+
import org.apache.spark.internal.Logging
27+
import org.apache.spark.launcher.SparkSubmitArgumentsParser
28+
29+
/**
30+
* Outer implementation of the spark-pipelines command line interface. Responsible for routing
31+
* spark-submit args to spark-submit, and pipeline-specific args to the inner Python CLI
32+
* implementation that loads the user code and submits it to the backend.
33+
*/
34+
object SparkPipelines extends Logging {
35+
def main(args: Array[String]): Unit = {
36+
val sparkHome = sys.env("SPARK_HOME")
37+
SparkSubmit.main(constructSparkSubmitArgs(args, sparkHome).toArray)
38+
}
39+
40+
protected[deploy] def constructSparkSubmitArgs(
41+
args: Array[String],
42+
sparkHome: String): Seq[String] = {
43+
val (sparkSubmitArgs, pipelinesArgs) = splitArgs(args)
44+
val pipelinesCliFile = s"$sparkHome/python/pyspark/pipelines/cli.py"
45+
(sparkSubmitArgs ++ Seq(pipelinesCliFile) ++ pipelinesArgs)
46+
}
47+
48+
/**
49+
* Split the arguments into spark-submit args (--master, --remote, etc.) and pipeline args
50+
* (run, --spec, etc.).
51+
*/
52+
private def splitArgs(args: Array[String]): (Seq[String], Seq[String]) = {
53+
val sparkSubmitArgs = new ArrayBuffer[String]()
54+
val pipelinesArgs = new ArrayBuffer[String]()
55+
var remote = "local"
56+
var name: Option[String] = None
57+
58+
new SparkSubmitArgumentsParser() {
59+
parse(util.Arrays.asList(args: _*))
60+
61+
override protected def handle(opt: String, value: String): Boolean = {
62+
if (opt == "--remote") {
63+
remote = value
64+
} else if (opt == "--class") {
65+
logInfo("--class argument not supported.")
66+
throw SparkUserAppException(1)
67+
} else if (opt == "--conf" &&
68+
value.startsWith("spark.api.mode=") &&
69+
value != "spark.api.mode=connect") {
70+
logInfo(
71+
"--spark.api.mode must be 'connect'. " +
72+
"Declarative Pipelines currently only supports Spark Connect."
73+
)
74+
throw SparkUserAppException(1)
75+
} else if (opt == "--name") {
76+
name = Option(value)
77+
} else {
78+
sparkSubmitArgs += opt
79+
if (value != null) {
80+
sparkSubmitArgs += value
81+
}
82+
}
83+
84+
true
85+
}
86+
87+
override protected def handleExtraArgs(extra: util.List[String]): Unit = {
88+
pipelinesArgs.appendAll(extra.asScala)
89+
}
90+
91+
override protected def handleUnknown(opt: String): Boolean = {
92+
pipelinesArgs += opt
93+
true
94+
}
95+
}
96+
97+
sparkSubmitArgs += "--conf"
98+
sparkSubmitArgs += "spark.api.mode=connect"
99+
sparkSubmitArgs += "--remote"
100+
sparkSubmitArgs += remote
101+
if (name.isDefined) {
102+
pipelinesArgs += "--name"
103+
pipelinesArgs += name.get
104+
}
105+
(sparkSubmitArgs.toSeq, pipelinesArgs.toSeq)
106+
}
107+
108+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import org.scalatest.BeforeAndAfterEach
21+
import org.scalatest.matchers.must.Matchers
22+
23+
import org.apache.spark.SparkUserAppException
24+
25+
class SparkPipelinesSuite extends SparkSubmitTestUtils with Matchers with BeforeAndAfterEach {
26+
test("only spark submit args") {
27+
val args = Array(
28+
"--remote",
29+
"local[2]",
30+
"--deploy-mode",
31+
"client",
32+
"--supervise",
33+
"--conf",
34+
"spark.conf1=2",
35+
"--conf",
36+
"spark.conf2=3"
37+
)
38+
assert(
39+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
40+
Seq(
41+
"--deploy-mode",
42+
"client",
43+
"--supervise",
44+
"--conf",
45+
"spark.conf1=2",
46+
"--conf",
47+
"spark.conf2=3",
48+
"--conf",
49+
"spark.api.mode=connect",
50+
"--remote",
51+
"local[2]",
52+
"abc/python/pyspark/pipelines/cli.py"
53+
)
54+
)
55+
}
56+
57+
test("only pipelines args") {
58+
val args = Array(
59+
"run",
60+
"--spec",
61+
"pipeline.yml"
62+
)
63+
assert(
64+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
65+
Seq(
66+
"--conf",
67+
"spark.api.mode=connect",
68+
"--remote",
69+
"local",
70+
"abc/python/pyspark/pipelines/cli.py",
71+
"run",
72+
"--spec",
73+
"pipeline.yml"
74+
)
75+
)
76+
}
77+
78+
test("spark-submit and pipelines args") {
79+
val args = Array(
80+
"--remote",
81+
"local[2]",
82+
"run",
83+
"--supervise",
84+
"--spec",
85+
"pipeline.yml",
86+
"--conf",
87+
"spark.conf2=3"
88+
)
89+
assert(
90+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
91+
Seq(
92+
"--supervise",
93+
"--conf",
94+
"spark.conf2=3",
95+
"--conf",
96+
"spark.api.mode=connect",
97+
"--remote",
98+
"local[2]",
99+
"abc/python/pyspark/pipelines/cli.py",
100+
"run",
101+
"--spec",
102+
"pipeline.yml"
103+
)
104+
)
105+
}
106+
107+
test("class arg prohibited") {
108+
val args = Array(
109+
"--class",
110+
"org.apache.spark.deploy.SparkPipelines"
111+
)
112+
intercept[SparkUserAppException] {
113+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
114+
}
115+
}
116+
117+
test("name arg") {
118+
val args = Array(
119+
"init",
120+
"--name",
121+
"myproject"
122+
)
123+
assert(
124+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
125+
Seq(
126+
"--conf",
127+
"spark.api.mode=connect",
128+
"--remote",
129+
"local",
130+
"abc/python/pyspark/pipelines/cli.py",
131+
"init",
132+
"--name",
133+
"myproject"
134+
)
135+
)
136+
}
137+
}

python/pyspark/pipelines/cli.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -205,17 +205,17 @@ def change_dir(path: Path) -> Generator[None, None, None]:
205205
os.chdir(prev)
206206

207207

208-
def run(spec_path: Path, remote: str) -> None:
208+
def run(spec_path: Path) -> None:
209209
"""Run the pipeline defined with the given spec."""
210210
log_with_curr_timestamp(f"Loading pipeline spec from {spec_path}...")
211211
spec = load_pipeline_spec(spec_path)
212212

213213
log_with_curr_timestamp("Creating Spark session...")
214-
spark_builder = SparkSession.builder.remote(remote)
214+
spark_builder = SparkSession.builder
215215
for key, value in spec.configuration.items():
216216
spark_builder = spark_builder.config(key, value)
217217

218-
spark = spark_builder.create()
218+
spark = spark_builder.getOrCreate()
219219

220220
log_with_curr_timestamp("Creating dataflow graph...")
221221
dataflow_graph_id = create_dataflow_graph(
@@ -244,9 +244,6 @@ def run(spec_path: Path, remote: str) -> None:
244244
# "run" subcommand
245245
run_parser = subparsers.add_parser("run", help="Run a pipeline.")
246246
run_parser.add_argument("--spec", help="Path to the pipeline spec.")
247-
run_parser.add_argument(
248-
"--remote", help="The Spark Connect remote to connect to.", required=True
249-
)
250247

251248
# "init" subcommand
252249
init_parser = subparsers.add_parser(
@@ -274,6 +271,6 @@ def run(spec_path: Path, remote: str) -> None:
274271
else:
275272
spec_path = find_pipeline_spec(Path.cwd())
276273

277-
run(spec_path=spec_path, remote=args.remote)
274+
run(spec_path=spec_path)
278275
elif args.command == "init":
279276
init(args.name)

0 commit comments

Comments
 (0)