diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index f9a4205b3a6e9..db6148ab036cb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -181,7 +181,18 @@ private[spark] object Config extends Logging { .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + "which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") .doubleConf - .createWithDefault(0.10) + .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1, + "Ensure that memory overhead is a double between 0 --> 1.0") + .createOptional + + val PYSPARK_PYTHON_VERSION = + ConfigBuilder("spark.kubernetes.pyspark.pythonversion") + .doc("This sets the python version. Either 2 or 3. (Python2 or Python3)") + .stringConf + .checkValue(pv => List("2", "3").contains(pv), + "Ensure that Python Version is either Python2 or Python3") + .createWithDefault("2") + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = "spark.kubernetes.authenticate.submission" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 3884755e3aca7..7637d7b3452a7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -75,6 +75,7 @@ private[spark] object Constants { val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" val ENV_PYSPARK_FILES = "PYSPARK_FILES" val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS" + val ENV_PYSPARK_PYTHON_VERSION = "PYSPARK_PYTHON_VERSION" // Miscellaneous val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 0e28de97b0567..d97aaacefa0f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -76,6 +76,9 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def pySparkAppArgs(): Option[String] = sparkConf .get(KUBERNETES_PYSPARK_APP_ARGS) + def pySparkPythonVersion(): String = sparkConf + .get(PYSPARK_PYTHON_VERSION) + def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) def imagePullSecrets(): Seq[LocalObjectReference] = { @@ -131,7 +134,7 @@ private[spark] object KubernetesConf { sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" ")) } - sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4) + sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) } val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 0e2f279c189c9..258d4947aae3c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -53,7 +53,7 @@ private[spark] class BasicDriverFeatureStep( private val driverMemoryMiB = conf.get(DRIVER_MEMORY) private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) - .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, + .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 79d467eadd30f..06d103dd94711 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep( private val memoryOverheadMiB = kubernetesConf .get(EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, + .getOrElse(math.max( + (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala index 295d7093c5952..02d8fbc21c151 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala @@ -20,7 +20,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder import io.fabric8.kubernetes.api.model.HasMetadata import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Constants.{ENV_PYSPARK_ARGS, ENV_PYSPARK_FILES, ENV_PYSPARK_PRIMARY} +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesUtils import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep @@ -32,7 +32,7 @@ private[spark] class PythonDriverFeatureStep( require(mainResource.isDefined, "PySpark Main Resource must be defined") val otherPyFiles = kubernetesConf.pyFiles().map(pyFile => KubernetesUtils.resolveFileUrisAndPath(pyFile.split(",")) - .mkString(",")).getOrElse("") + .mkString(":")).getOrElse("") val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container) .addNewEnv() .withName(ENV_PYSPARK_ARGS) @@ -46,6 +46,10 @@ private[spark] class PythonDriverFeatureStep( .withName(ENV_PYSPARK_FILES) .withValue(if (otherPyFiles == "") {""} else otherPyFiles) .endEnv() + .addNewEnv() + .withName(ENV_PYSPARK_PYTHON_VERSION) + .withValue(kubernetesConf.pySparkPythonVersion()) + .endEnv() .build() SparkPod(pod.pod, withPythonPrimaryFileContainer) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index a4587cbe37f32..46ae7a2853412 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -93,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite { None) assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) + assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR).isEmpty) } test("Creating driver conf with a python primary file") { @@ -114,14 +114,15 @@ class KubernetesConfSuite extends SparkFunSuite { Some(inputPyFiles.mkString(","))) assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) + assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === Some(0.4)) assert(kubernetesConfWithMainResource.sparkFiles === Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles) } - test("Resolve driver labels, annotations, secret mount paths, and envs.") { + test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") { val sparkConf = new SparkConf(false) + .set(MEMORY_OVERHEAD_FACTOR, 0.3) CUSTOM_LABELS.foreach { case (key, value) => sparkConf.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$key", value) } @@ -151,6 +152,7 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS) assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.roleEnvs === CUSTOM_ENVS) + assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === Some(0.3)) } test("Basic executor translated fields.") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala index f84d5e2adb35f..37c22b1033d12 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala @@ -32,7 +32,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { val mainResource = "local:///main.py" val pyFiles = Seq("local:///example2.py", "local:///example3.py") val expectedPySparkFiles = - "/example2.py,/example3.py" + "/example2.py:/example3.py" val baseDriverPod = SparkPod.initialPod() val sparkConf = new SparkConf(false) .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) @@ -57,7 +57,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { val step = new PythonDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - assert(driverContainerwithPySpark.getEnv.size === 3) + assert(driverContainerwithPySpark.getEnv.size === 4) val envs = driverContainerwithPySpark .getEnv .asScala @@ -66,12 +66,14 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource) assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles) assert(envs(ENV_PYSPARK_ARGS) === "5 7") + assert(envs(ENV_PYSPARK_PYTHON_VERSION) === "2") } test("Python Step testing empty pyfiles") { val mainResource = "local:///main.py" val baseDriverPod = SparkPod.initialPod() val sparkConf = new SparkConf(false) .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) + .set(PYSPARK_PYTHON_VERSION, "3") val kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( @@ -89,7 +91,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { val step = new PythonDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - assert(driverContainerwithPySpark.getEnv.size === 3) + assert(driverContainerwithPySpark.getEnv.size === 4) val envs = driverContainerwithPySpark .getEnv .asScala @@ -97,5 +99,6 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { .toMap assert(envs(ENV_PYSPARK_FILES) === "") assert(envs(ENV_PYSPARK_ARGS) === "") + assert(envs(ENV_PYSPARK_PYTHON_VERSION) === "3") } } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile index a36dc87dd8027..0067ff99280fe 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile @@ -18,16 +18,17 @@ ARG base_img FROM $base_img WORKDIR / -COPY python /opt/spark/python +RUN mkdir ${SPARK_HOME}/python +COPY python/lib ${SPARK_HOME}/python/lib RUN apk add --no-cache python && \ + apk add --no-cache python3 && \ python -m ensurepip && \ + python3 -m ensurepip && \ rm -r /usr/lib/python*/ensurepip && \ pip install --upgrade pip setuptools && \ rm -r /root/.cache -ENV PYTHON_VERSION 2.7.13 -ENV PYSPARK_PYTHON python -ENV PYSPARK_DRIVER_PYTHON python -ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:${PYTHONPATH} + +ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip WORKDIR /opt/spark/work-dir ENTRYPOINT [ "/opt/entrypoint.sh" ] diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 27840bfe635e5..d32b6bd37443a 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -53,12 +53,21 @@ if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." . fi -PYSPARK_SECONDARY="$PYSPARK_APP_ARGS" -if [ ! -z "$PYSPARK_FILES" ]; then - PYSPARK_SECONDARY="$PYSPARK_FILES $PYSPARK_APP_ARGS" +if [ -n "$PYSPARK_FILES" ]; then + PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES" fi - +if [ "$PYSPARK_PYTHON_VERSION" == "2" ]; then + pyv="$(python -V 2>&1)" + export PYTHON_VERSION="${pyv:7}" + export PYSPARK_PYTHON="python" + export PYSPARK_DRIVER_PYTHON="python" +elif [ "$PYSPARK_PYTHON_VERSION" == "3" ]; then + pyv3="$(python3 -V 2>&1)" + export PYTHON_VERSION="${pyv3:7}" + export PYSPARK_PYTHON="python3" + export PYSPARK_DRIVER_PYTHON="python3" +fi case "$SPARK_K8S_CMD" in driver) @@ -74,7 +83,7 @@ case "$SPARK_K8S_CMD" in "$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client - "$@" $PYSPARK_PRIMARY $PYSPARK_SECONDARY + "$@" $PYSPARK_PRIMARY $PYSPARK_APP_ARGS ) ;;