Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Use nodeAffinity to launch executors on cluster nodes that benefit from HDFS data locality #315

@kimoonkim

Description

@kimoonkim

@foxish @ash211

This is a sub-issue of the HDFS data locality issue #206.

When using HDFS, the Spark driver sends tasks to particular executors that have tasks' HDFS data on local disks. (Implemented in #216)

To increase the chance of benefitting from data locality, the driver should also launch executors on cluster nodes that have tasks' HDFS data in the first place if possible.

There are a few Spark core base classes that update the key data structure hostToLocalTaskCount, which contains a map of cluster nodes to the number of tasks that could benefit from data locality if tasks run on the cluster node.

  1. ExecutorAllocationManager listens to a newly submitted stage and looks at the partition locations of the stage input data. The cluster nodes that have the partition locations will increase the task count.
   override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
       ...
        stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
          if (!locality.isEmpty) {
            numTasksPending += 1
            locality.foreach { location =>
              val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
              hostToLocalTaskCountPerStage(location.host) = count
            }
          }
        }
        stageIdToExecutorPlacementHints.put(stageId,
          (numTasksPending, hostToLocalTaskCountPerStage.toMap))

        // Update the executor placement hints
        updateExecutorPlacementHints()
      }
    }

  def updateExecutorPlacementHints(): Unit = {
      var localityAwareTasks = 0
      val localityToCount = new mutable.HashMap[String, Int]()
      stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
        localityAwareTasks += numTasksPending
        localities.foreach { case (hostname, count) =>
          val updatedCount = localityToCount.getOrElse(hostname, 0) + count
          localityToCount(hostname) = updatedCount
        }
      }

      allocationManager.localityAwareTasks = localityAwareTasks
      allocationManager.hostToLocalTaskCount = localityToCount.toMap
    }
  1. CoarseGrainedSchedulerBackend will be informed of hostToLocalTaskCount and stores a snapshot.
  final override def requestTotalExecutors(
      numExecutors: Int,
      localityAwareTasks: Int,
      hostToLocalTaskCount: Map[String, Int]
    ): Boolean = {
    if (numExecutors < 0) {
      throw new IllegalArgumentException(
        "Attempted to request a negative number of executor(s) " +
          s"$numExecutors from the cluster manager. Please specify a positive number!")
    }

    val response = synchronized {
      this.localityAwareTasks = localityAwareTasks
      this.hostToLocalTaskCount = hostToLocalTaskCount

      numPendingExecutors =
        math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)

      doRequestTotalExecutors(numExecutors)
    }

    defaultAskTimeout.awaitResult(response)
  }

In K8s, we can use K8s nodeAffinity (doc)
in KubernetesClusterSchedulerBackend, which is a subclass of CoarseGrainedSchedulerBackend, to express this preference. In particular, preferredDuringSchedulingIgnoredDuringExecution can express soft requirement. From the doc:

There are currently two types of node affinity, called requiredDuringSchedulingIgnoredDuringExecution and preferredDuringSchedulingIgnoredDuringExecution. You can think of them as “hard” and “soft” respectively, in the sense that the former specifies rules that must be met for a pod to schedule onto a node (just like nodeSelector but using a more expressive syntax), while the latter specifies preferences that the scheduler will try to enforce but will not guarantee. The “IgnoredDuringExecution” part of the names means that, similar to how nodeSelector works, if labels on a node change at runtime such that the affinity rules on a pod are no longer met, the pod will still continue to run on the node.

I'll send a PR soon to address this need.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions