Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,18 @@ private[spark] object Config extends Logging {
.doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs")
.doubleConf
.createWithDefault(0.10)
.checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1,
"Ensure that memory overhead is a double between 0 --> 1.0")
.createOptional

val PYSPARK_PYTHON_VERSION =
ConfigBuilder("spark.kubernetes.pyspark.pythonversion")
.doc("This sets the python version. Either 2 or 3. (Python2 or Python3)")
.stringConf
.checkValue(pv => List("2", "3").contains(pv),
"Ensure that Python Version is either Python2 or Python3")
.createWithDefault("2")


val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private[spark] object Constants {
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_PYTHON_VERSION = "PYSPARK_PYTHON_VERSION"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
def pySparkAppArgs(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_APP_ARGS)

def pySparkPythonVersion(): String = sparkConf
.get(PYSPARK_PYTHON_VERSION)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

def imagePullSecrets(): Seq[LocalObjectReference] = {
Expand Down Expand Up @@ -131,7 +134,7 @@ private[spark] object KubernetesConf {
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" "))
}
sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4)
sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class BasicDriverFeatureStep(
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt,
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * driverMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep(

private val memoryOverheadMiB = kubernetesConf
.get(EXECUTOR_MEMORY_OVERHEAD)
.getOrElse(math.max((kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
.getOrElse(math.max(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder
import io.fabric8.kubernetes.api.model.HasMetadata

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants.{ENV_PYSPARK_ARGS, ENV_PYSPARK_FILES, ENV_PYSPARK_PRIMARY}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep

Expand All @@ -32,7 +32,7 @@ private[spark] class PythonDriverFeatureStep(
require(mainResource.isDefined, "PySpark Main Resource must be defined")
val otherPyFiles = kubernetesConf.pyFiles().map(pyFile =>
KubernetesUtils.resolveFileUrisAndPath(pyFile.split(","))
.mkString(",")).getOrElse("")
.mkString(":")).getOrElse("")
val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container)
.addNewEnv()
.withName(ENV_PYSPARK_ARGS)
Expand All @@ -46,6 +46,10 @@ private[spark] class PythonDriverFeatureStep(
.withName(ENV_PYSPARK_FILES)
.withValue(if (otherPyFiles == "") {""} else otherPyFiles)
.endEnv()
.addNewEnv()
.withName(ENV_PYSPARK_PYTHON_VERSION)
.withValue(kubernetesConf.pySparkPythonVersion())
.endEnv()
.build()
SparkPod(pod.pod, withPythonPrimaryFileContainer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite {
None)
assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1)
assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR).isEmpty)
}

test("Creating driver conf with a python primary file") {
Expand All @@ -114,14 +114,15 @@ class KubernetesConfSuite extends SparkFunSuite {
Some(inputPyFiles.mkString(",")))
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4)
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === Some(0.4))
assert(kubernetesConfWithMainResource.sparkFiles
=== Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles)
}


test("Resolve driver labels, annotations, secret mount paths, and envs.") {
test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") {
val sparkConf = new SparkConf(false)
.set(MEMORY_OVERHEAD_FACTOR, 0.3)
CUSTOM_LABELS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$key", value)
}
Expand Down Expand Up @@ -151,6 +152,7 @@ class KubernetesConfSuite extends SparkFunSuite {
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleEnvs === CUSTOM_ENVS)
assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === Some(0.3))
}

test("Basic executor translated fields.") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
val mainResource = "local:///main.py"
val pyFiles = Seq("local:///example2.py", "local:///example3.py")
val expectedPySparkFiles =
"/example2.py,/example3.py"
"/example2.py:/example3.py"
val baseDriverPod = SparkPod.initialPod()
val sparkConf = new SparkConf(false)
.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
Expand All @@ -57,7 +57,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
assert(driverContainerwithPySpark.getEnv.size === 3)
assert(driverContainerwithPySpark.getEnv.size === 4)
val envs = driverContainerwithPySpark
.getEnv
.asScala
Expand All @@ -66,12 +66,14 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource)
assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles)
assert(envs(ENV_PYSPARK_ARGS) === "5 7")
assert(envs(ENV_PYSPARK_PYTHON_VERSION) === "2")
}
test("Python Step testing empty pyfiles") {
val mainResource = "local:///main.py"
val baseDriverPod = SparkPod.initialPod()
val sparkConf = new SparkConf(false)
.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
.set(PYSPARK_PYTHON_VERSION, "3")
val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
Expand All @@ -89,13 +91,14 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
assert(driverContainerwithPySpark.getEnv.size === 3)
assert(driverContainerwithPySpark.getEnv.size === 4)
val envs = driverContainerwithPySpark
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs(ENV_PYSPARK_FILES) === "")
assert(envs(ENV_PYSPARK_ARGS) === "")
assert(envs(ENV_PYSPARK_PYTHON_VERSION) === "3")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
ARG base_img
FROM $base_img
WORKDIR /
COPY python /opt/spark/python
RUN mkdir ${SPARK_HOME}/python
COPY python/lib ${SPARK_HOME}/python/lib
RUN apk add --no-cache python && \
apk add --no-cache python3 && \
python -m ensurepip && \
python3 -m ensurepip && \
rm -r /usr/lib/python*/ensurepip && \
pip install --upgrade pip setuptools && \
rm -r /root/.cache
ENV PYTHON_VERSION 2.7.13
ENV PYSPARK_PYTHON python
ENV PYSPARK_DRIVER_PYTHON python
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:${PYTHONPATH}

ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip

WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,21 @@ if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then
cp -R "$SPARK_MOUNTED_FILES_DIR/." .
fi

PYSPARK_SECONDARY="$PYSPARK_APP_ARGS"
if [ ! -z "$PYSPARK_FILES" ]; then
PYSPARK_SECONDARY="$PYSPARK_FILES $PYSPARK_APP_ARGS"
if [ -n "$PYSPARK_FILES" ]; then
PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES"
fi


if [ "$PYSPARK_PYTHON_VERSION" == "2" ]; then
pyv="$(python -V 2>&1)"
export PYTHON_VERSION="${pyv:7}"
export PYSPARK_PYTHON="python"
export PYSPARK_DRIVER_PYTHON="python"
elif [ "$PYSPARK_PYTHON_VERSION" == "3" ]; then
pyv3="$(python3 -V 2>&1)"
export PYTHON_VERSION="${pyv3:7}"
export PYSPARK_PYTHON="python3"
export PYSPARK_DRIVER_PYTHON="python3"
fi

case "$SPARK_K8S_CMD" in
driver)
Expand All @@ -74,7 +83,7 @@ case "$SPARK_K8S_CMD" in
"$SPARK_HOME/bin/spark-submit"
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
--deploy-mode client
"$@" $PYSPARK_PRIMARY $PYSPARK_SECONDARY
"$@" $PYSPARK_PRIMARY $PYSPARK_APP_ARGS
)
;;

Expand Down