-
Notifications
You must be signed in to change notification settings - Fork 117
Monitor pod status in submission v2. #283
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,46 +14,50 @@ | |
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.kubernetes.submit.v1 | ||
| package org.apache.spark.deploy.kubernetes.submit | ||
|
|
||
| import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} | ||
| import java.util.concurrent.{CountDownLatch, TimeUnit} | ||
|
|
||
| import io.fabric8.kubernetes.api.model.Pod | ||
| import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod} | ||
| import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} | ||
| import io.fabric8.kubernetes.client.Watcher.Action | ||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| private[kubernetes] trait LoggingPodStatusWatcher extends Watcher[Pod] { | ||
| def awaitCompletion(): Unit | ||
| } | ||
|
|
||
| /** | ||
| * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on | ||
| * every state change and also at an interval for liveness. | ||
| * | ||
| * @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes | ||
| * @param appId | ||
| * @param interval ms between each state request. If set to 0 or a negative number, the periodic | ||
| * logging will be disabled. | ||
| * @param maybeLoggingInterval ms between each state request. If provided, must be a positive | ||
| * number. | ||
| */ | ||
| private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch, | ||
| appId: String, | ||
| interval: Long) | ||
| extends Watcher[Pod] with Logging { | ||
| private[kubernetes] class LoggingPodStatusWatcherImpl( | ||
| appId: String, maybeLoggingInterval: Option[Long]) | ||
| extends LoggingPodStatusWatcher with Logging { | ||
|
|
||
| private val podCompletedFuture = new CountDownLatch(1) | ||
| // start timer for periodic logging | ||
| private val scheduler = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") | ||
| private val logRunnable: Runnable = new Runnable { | ||
| override def run() = logShortStatus() | ||
| } | ||
|
|
||
| private var pod: Option[Pod] = Option.empty | ||
| private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown") | ||
| private def status: String = pod.map(_.getStatus().getContainerStatuses().toString()) | ||
| .getOrElse("unknown") | ||
| private var pod = Option.empty[Pod] | ||
|
|
||
| private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") | ||
|
|
||
| def start(): Unit = { | ||
| if (interval > 0) { | ||
| maybeLoggingInterval.foreach { interval => | ||
| require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") | ||
| scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) | ||
| } | ||
| } | ||
|
|
@@ -98,7 +102,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL | |
| } | ||
|
|
||
| private def formatPodState(pod: Pod): String = { | ||
|
|
||
| // TODO include specific container state | ||
| val details = Seq[(String, String)]( | ||
| // pod metadata | ||
| ("pod name", pod.getMetadata.getName()), | ||
|
|
@@ -116,17 +120,59 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL | |
| ("start time", pod.getStatus.getStartTime), | ||
| ("container images", | ||
| pod.getStatus.getContainerStatuses() | ||
| .asScala | ||
| .map(_.getImage) | ||
| .mkString(", ")), | ||
| .asScala | ||
| .map(_.getImage) | ||
| .mkString(", ")), | ||
| ("phase", pod.getStatus.getPhase()), | ||
| ("status", pod.getStatus.getContainerStatuses().toString) | ||
| ) | ||
| formatPairsBundle(details) | ||
| } | ||
|
|
||
| private def formatPairsBundle(pairs: Seq[(String, String)]) = { | ||
| // Use more loggable format if value is null or empty | ||
| details.map { case (k, v) => | ||
| val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") | ||
| s"\n\t $k: $newValue" | ||
| pairs.map { | ||
| case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}" | ||
| }.mkString("") | ||
| } | ||
|
|
||
| override def awaitCompletion(): Unit = { | ||
|
||
| podCompletedFuture.countDown() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, missed this. Can you send a fix? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never mind I see the fix now. |
||
| logInfo(pod.map { p => | ||
| s"Container final statuses:\n\n${containersDescription(p)}" | ||
| }.getOrElse("No containers were found in the driver pod.")) | ||
| } | ||
|
|
||
| private def containersDescription(p: Pod): String = { | ||
| p.getStatus.getContainerStatuses.asScala.map { status => | ||
| Seq( | ||
| ("Container name", status.getName), | ||
| ("Container image", status.getImage)) ++ | ||
| containerStatusDescription(status) | ||
| }.map(formatPairsBundle).mkString("\n\n") | ||
| } | ||
|
|
||
| private def containerStatusDescription( | ||
| containerStatus: ContainerStatus): Seq[(String, String)] = { | ||
| val state = containerStatus.getState | ||
| Option(state.getRunning) | ||
| .orElse(Option(state.getTerminated)) | ||
| .orElse(Option(state.getWaiting)) | ||
| .map { | ||
| case running: ContainerStateRunning => | ||
| Seq( | ||
| ("Container state", "Running"), | ||
| ("Container started at", running.getStartedAt)) | ||
| case waiting: ContainerStateWaiting => | ||
| Seq( | ||
| ("Container state", "Waiting"), | ||
| ("Pending reason", waiting.getReason)) | ||
| case terminated: ContainerStateTerminated => | ||
| Seq( | ||
| ("Container state", "Terminated"), | ||
| ("Exit code", terminated.getExitCode.toString)) | ||
| case unknown => | ||
| throw new SparkException(s"Unexpected container status type ${unknown.getClass}.") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where would this exception get caught? I don't think we want to throw an exception that possibly blocks application flow just for logging purposes -- maybe instead log Container state "Unknown" so as to not break code flow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be happening in the logging thread, I don't think this will crash the application. |
||
| }.getOrElse(Seq(("Container state", "N/A"))) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exceeds 100 chars and would fail scala style check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scalastyle doesn't check imports for line length.