Skip to content

Commit 69c8270

Browse files
lins05ash211
authored andcommitted
Check for user jars/files existence before creating the driver pod. (apache#86)
* Check for user jars/files existence before creating the driver pod. Close apache-spark-on-k8s#85 * CR
1 parent a175f7a commit 69c8270

File tree

1 file changed

+19
-1
lines changed
  • resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes

1 file changed

+19
-1
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ private[spark] class Client(
8484

8585
def run(): Unit = {
8686
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
87-
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
8887

88+
Seq(uploadedFiles, uploadedJars, Some(mainAppResource)).foreach(checkForFilesExistence)
89+
90+
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
8991
val parsedCustomLabels = parseCustomLabels(customLabels)
9092
var k8ConfBuilder = new K8SConfigBuilder()
9193
.withApiVersion("v1")
@@ -661,6 +663,22 @@ private[spark] class Client(
661663
}).toMap
662664
}).getOrElse(Map.empty[String, String])
663665
}
666+
667+
private def checkForFilesExistence(maybePaths: Option[String]): Unit = {
668+
maybePaths.foreach { paths =>
669+
paths.split(",").foreach { path =>
670+
val uri = Utils.resolveURI(path)
671+
uri.getScheme match {
672+
case "file" | null =>
673+
val file = new File(uri.getPath)
674+
if (!file.isFile) {
675+
throw new SparkException(s"""file "${uri}" does not exist!""")
676+
}
677+
case _ =>
678+
}
679+
}
680+
}
681+
}
664682
}
665683

666684
private[spark] object Client extends Logging {

0 commit comments

Comments
 (0)