Skip to content

Conversation

@attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Apr 30, 2019

What changes were proposed in this pull request?

Problem statement

An executor which has persisted blocks does not consider to be idle and this way ready to be released by dynamic allocation after the regular timeout spark.dynamicAllocation.executorIdleTimeout but there is separate configuration spark.dynamicAllocation.cachedExecutorIdleTimeout which defaults to Integer.MAX_VALUE. This is because releasing the executor also means losing the persisted blocks (as the metadata for individual blocks called BlockInfo are kept in memory) and when the RDD is referenced latter on this lost blocks will be recomputed.
On the other hand keeping the executors too long without any task to work on is also a waste of resources (as executors are reserved for the application by the resource manager).

Solution

This PR focuses on the first part of SPARK-25888: it extends the external shuffle service with the capability to serve RDD blocks which are persisted on the local disk store by the executors. Moreover when this feature is enabled by setting the spark.shuffle.service.fetch.rdd.enabled config to true and a block is reported to be persisted on to disk the external shuffle service instance running on the same host as the executor is also registered (along with the reporting block manager) as a possible location for fetching it.

Some implementation detail

Some explanation about the decisions made during the development:

  • the location list to fetch a block was randomized but the groups (same host, same rack, others) order was kept. In this PR the order of groups are kept and external shuffle service added to the end of the each group.
  • BlockManagerInfo is not introduced for external shuffle service but only a lightweight solution is taken. A hash map from BlockId to BlockStatus is introduced. A type alias would make the source more readable but I know it is discouraged. On the other hand a new class wrapping this hash map would introduce unnecessary indirection.
  • when this feature is on the cleanup triggered during removing of executors (which is handled in ExternalShuffleBlockResolver) is modified to keep the disk persisted RDD blocks. This cleanup is triggered in standalone mode when the spark.storage.cleanupFilesAfterExecutorExit config is set.
  • the unpersisting of an RDD is extended to use the external shuffle service for disk persisted RDD blocks when the original executor which created the blocks are already released. New block transport messages are introduced to support this: RemoveBlocks and BlocksRemoved.

How was this patch tested?

Unit tests

ExternalShuffleServiceSuite

Here the complete use case is tested by the "SPARK-25888: using external shuffle service fetching disk persisted blocks" with a tiny difference: here the executor is killed manually, this way the test is a bit faster than waiting for the idle timeout.

ExternalShuffleBlockHandlerSuite

Tests the fetching of the RDD blocks via the external shuffle service.

BlockManagerInfoSuite

This a new suite. As the BlockManagerInfo behaviour depends very much on whether the external shuffle service enabled or not all the tests are executed with and without it.

BlockManagerSuite

Tests the sorting of the block locations.

Manually on YARN

Spark App was:

package com.mycompany

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel

object TestAppDiskOnlyLevel {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test-app")

    println("Attila: START")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(0 until 100, 10)
      .map { i =>
        println(s"Attila: calculate first rdd i=$i")
        Thread.sleep(1000)
        i
      }

    rdd.persist(StorageLevel.DISK_ONLY)
    rdd.count()

    println("Attila: First RDD is processed, waiting for 60 sec")

    Thread.sleep(60 * 1000)

    println("Attila: Num executors must be 0 as executorIdleTimeout is way over")

    val rdd2 = sc.parallelize(0 until 10, 1)
      .map(i => (i, 1))
      .persist(StorageLevel.DISK_ONLY)

    rdd2.count()

    println("Attila: Second RDD with one partition (only one executors must be alive)")
    
    // reduce runs as user code to detect the empty seq (empty blocks)  
    println("Calling collect on the first RDD: " + rdd.collect().reduce(_ + _))

    println("Attila: STOP")
  }
}

I have submitted with the following configuration:

spark-submit --master yarn \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.executorIdleTimeout=30 \
  --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90 \
  --class com.mycompany.TestAppDiskOnlyLevel dyn_alloc_demo-core_2.11-0.1.0-SNAPSHOT-jar-with-dependencies.jar

Checked the result by filtering for the side effect of the task calculations:

[user@server ~]$ yarn logs -applicationId application_1556299359453_0001 | grep "Attila: calculate" | wc -l
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
19/04/26 10:31:59 INFO client.RMProxy: Connecting to ResourceManager at apiros-1.gce.company.com/172.31.115.165:8032
100

So it is only 100 task execution and not 200 (which would be the case for re-computation).

Moreover from the submit/launcher log we can see executors really stopped in between (see the new total is 0 before the last line):

[user@server ~]$ grep "Attila: Num executors must be 0" -B 2 spark-submit.log
19/04/26 10:24:27 INFO cluster.YarnScheduler: Executor 9 on apiros-3.gce.company.com killed by driver.
19/04/26 10:24:27 INFO spark.ExecutorAllocationManager: Existing executor 9 has been removed (new total is 0)
Attila: Num executors must be 0 as executorIdleTimeout is way over

Full spark submit log

I have done a test also after changing the DISK_ONLY storage level to MEMORY_ONLY for the first RDD. After this change during the 60sec waiting no executor was removed.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I didn't look at the tests yet.

The one thing that I noticed is that the logic here is based on where the storage level says the block may be stored on disk, not whether the block is actually stored on disk.

Wouldn't this break (as in you'd lose cached data) if you have MEMORY_AND_DISK persistence, but a particular block is currently just sitting in memory?

@SparkQA
Copy link

SparkQA commented Apr 30, 2019

Test build #105037 has finished for PR 24499 at commit 3e7797a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public abstract class ShuffleClient implements BlockTransferClient, Closeable
  • case class HasExclusiveCachedBlocks(executorId: String) extends ToBlockManagerMaster

@attilapiros
Copy link
Contributor Author

attilapiros commented Apr 30, 2019

@vanzin Thanks for the review!

Yes, the storage level is used in two contexts:

  • desired state (coming from the user)
  • actual state

I have checked and in UpdateBlockInfo message the actual state is reflected so useDisk is only true when the disk store really contains the block.

@SparkQA
Copy link

SparkQA commented Apr 30, 2019

Test build #105041 has finished for PR 24499 at commit d641805.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 30, 2019

Test build #105045 has finished for PR 24499 at commit 82c7bd9.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

@srowen Could it be something off with the Java style tests? That error reported in the 2nd commit was even present in my 1st commit (a space before the package). Could you please help how can I trigger the check locally? Do we have shell script for that?

@SparkQA
Copy link

SparkQA commented May 1, 2019

Test build #105048 has finished for PR 24499 at commit df3a80d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 1, 2019

Test build #105069 has finished for PR 24499 at commit 5476f6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 2, 2019

Test build #105089 has finished for PR 24499 at commit 5933ef0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 3, 2019

Test build #105095 has finished for PR 24499 at commit e66fe96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

The last commit is tested manually (in standalone mode):

$ grep "ExternalShuffleBlockResolver: Clean" /Users/attilapiros/github/spark/logs/spark-attilapiros-org.apache.spark.deploy.worker.Worker-1-apiros-MBP.local.out
19/05/03 16:34:05 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 0
19/05/03 16:34:05 INFO ExternalShuffleBlockResolver: Cleaning up non-shuffle and non-RDD files in executor AppExecId{appId=app-20190503163259-0000, execId=0}'s 1 local dirs
19/05/03 16:34:07 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 1
19/05/03 16:34:07 INFO ExternalShuffleBlockResolver: Cleaning up non-shuffle and non-RDD files in executor AppExecId{appId=app-20190503163259-0000, execId=1}'s 1 local dirs
19/05/03 16:34:07 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 2
19/05/03 16:34:07 INFO ExternalShuffleBlockResolver: Cleaning up non-shuffle and non-RDD files in executor AppExecId{appId=app-20190503163259-0000, execId=2}'s 1 local dirs
19/05/03 16:34:09 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 5
19/05/03 16:34:09 INFO ExternalShuffleBlockResolver: Cleaning up non-shuffle and non-RDD files in executor AppExecId{appId=app-20190503163259-0000, execId=5}'s 1 local dirs
19/05/03 16:34:09 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 4
19/05/03 16:34:09 INFO ExternalShuffleBlockResolver: Cleaning up non-shuffle and non-RDD files in executor AppExecId{appId=app-20190503163259-0000, execId=4}'s 1 local dirs
19/05/03 16:34:10 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 3
19/05/03 16:34:10 INFO ExternalShuffleBlockResolver: Cleaning up non-shuffle and non-RDD files in executor AppExecId{appId=app-20190503163259-0000, execId=3}'s 1 local dirs
19/05/03 16:34:10 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 6
19/05/03 16:34:10 INFO ExternalShuffleBlockResolver: Cleaning up non-shuffle and non-RDD files in executor AppExecId{appId=app-20190503163259-0000, execId=6}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 7
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up non-shuffle and non-RDD files in executor AppExecId{appId=app-20190503163259-0000, execId=7}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=app-20190503163259-0000, execId=0}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=app-20190503163259-0000, execId=1}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=app-20190503163259-0000, execId=6}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=app-20190503163259-0000, execId=7}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=app-20190503163259-0000, execId=4}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=app-20190503163259-0000, execId=5}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=app-20190503163259-0000, execId=2}'s 1 local dirs
19/05/03 16:35:00 INFO ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=app-20190503163259-0000, execId=3}'s 1 local dirs

And there were no block recalculation:

$ grep "Attila: calculate" work/app-20190503163259-0000/*/stdout | wc -l
     100

@SparkQA
Copy link

SparkQA commented May 3, 2019

Test build #105104 has finished for PR 24499 at commit 1d7f100.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 3, 2019

Test build #105109 has finished for PR 24499 at commit 612c4f3.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

Jenkins retest this please

Failure unrelated, locally it was running fine (although by python2.7):


 ~/github/spark/python (SPARK-25888-final) $ ./run-tests --testnames pyspark.streaming.tests.test_dstream
Running PySpark tests. Output is in /Users/attilapiros/github/spark/python/unit-tests.log
Will test against the following Python executables: ['python2.7']
Will test the following Python tests: ['pyspark.streaming.tests.test_dstream']
Starting test(python2.7): pyspark.streaming.tests.test_dstream
Finished test(python2.7): pyspark.streaming.tests.test_dstream (104s)
Tests passed in 104 seconds

@attilapiros
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented May 3, 2019

Test build #105114 has finished for PR 24499 at commit 612c4f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some small things.

@attilapiros
Copy link
Contributor Author

cc @squito @dbtsai

@SparkQA
Copy link

SparkQA commented May 6, 2019

Test build #105161 has finished for PR 24499 at commit f0e141d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a really brief review so far

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105217 has finished for PR 24499 at commit cc7aea0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105226 has finished for PR 24499 at commit 0d6ed51.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still need to go through tests, but implementation makes sense to me

@squito
Copy link
Contributor

squito commented May 7, 2019

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105232 has finished for PR 24499 at commit 0d6ed51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

I think it is important to mention why in the previous commit the spark.shuffle.io.maxRetries is set to 0 for testing:

Without this settings the runtime of the test with the corrupt file (ExternalShuffleIntegrationSuite#testFetchCorruptRddBlock) increases dramatically (with 0 retries it is only takes 0.2 seconds but with 3 retries it goes up to 15sec). I think it is because the error is detected at a very deep level within Netty and the channel is closed right here:

logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();

So not the quick ChunkFetchFailure is sent right away for this request.

@squito
Copy link
Contributor

squito commented May 8, 2019

moving the discussion about file deletion and null buffers to the top-level so it doesn't get folded on code updates:

what if the file existed when FileSegmentManagedBuffer is constructed with a non-zero length but when the file is about to be put on the wire or right before that it is removed by the owning executor.

Just to understand what you're protecting against here -- is there an expected path where the file gets deleted? Or are you just trying to have the behavior be a little more understandable when something bad happens on the host and the file goes missing?

@attilapiros
Copy link
Contributor Author

attilapiros commented May 8, 2019

There is no code path that I know of where the files are deleted. I just try to make this as robust as possible (considering of course the price of this robustness) and I would like to know in advance how it will behave if something similar happens. This is why I added the new test.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more nits. I need to do another more careful pass on the whole patch.

@SparkQA
Copy link

SparkQA commented May 21, 2019

Test build #105638 has finished for PR 24499 at commit e3adc05.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class Constant

@SparkQA
Copy link

SparkQA commented May 22, 2019

Test build #105680 has finished for PR 24499 at commit bf9ec92.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

Jenkins retest this please

1 similar comment
@attilapiros
Copy link
Contributor Author

Jenkins retest this please

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just a small test nit (and a revert of a previous suggestion).

@SparkQA
Copy link

SparkQA commented May 22, 2019

Test build #105700 has finished for PR 24499 at commit bf9ec92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 23, 2019

Test build #105720 has finished for PR 24499 at commit faa583f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented May 23, 2019

Alright, good to go. Merging to master.

@vanzin vanzin closed this in e9f3f62 May 23, 2019
@attilapiros
Copy link
Contributor Author

@vanzin, @squito thanks for all the reviews

public boolean equals(Object other) {
if (other != null && other instanceof BlocksRemoved) {
BlocksRemoved o = (BlocksRemoved) other;
return Objects.equal(numRemovedBlocks, o.numRemovedBlocks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we compare 2 ints with Objects.equal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDEs are usually good at generating equals and hashCode for java classes, maybe we can use the generated version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right (I followed the pattern used within the same package like in ExecutorShuffleInfo).
I can open a minor PR with fixing these two or I can add it this tiny change into my next PR which might be opened next week or the week after. It is about avoiding the network at fetching shuffle blocks from the block manager running on the same host, so it is just loosely related.

Which one is preferred by you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both are fine, this is really trivial

cfmcgrady pushed a commit to cfmcgrady/spark that referenced this pull request Jul 31, 2019
… service after releasing executor by dynamic allocation

An executor which has persisted blocks does not consider to be idle and this way ready to be released by dynamic allocation after the regular timeout `spark.dynamicAllocation.executorIdleTimeout` but there is separate configuration `spark.dynamicAllocation.cachedExecutorIdleTimeout` which defaults to `Integer.MAX_VALUE`. This is because releasing the executor also means losing the persisted blocks (as the metadata for individual blocks called `BlockInfo` are kept in memory) and when the RDD is referenced latter on this lost blocks will be recomputed.
On the other hand keeping the executors too long without any task to work on is also a waste of resources (as executors are reserved for the application by the resource manager).

This PR focuses on the first part of SPARK-25888: it extends the external shuffle service with the capability to serve RDD blocks which are persisted on the local disk store by the executors. Moreover when this feature is enabled by setting the `spark.shuffle.service.fetch.rdd.enabled` config to true and a block is reported to be persisted on to disk the external shuffle service instance running on the same host as the executor is also registered (along with the reporting block manager) as a possible location for fetching it.

Some explanation about the decisions made during the development:
- the location list to fetch a block was randomized but the groups (same host, same rack, others) order was kept. In this PR the order of groups are kept and external shuffle service added to the end of the each group.
- `BlockManagerInfo` is not introduced for external shuffle service but only a lightweight solution is taken. A hash map from `BlockId` to `BlockStatus` is introduced. A type alias would make the source more readable but I know it is discouraged. On the other hand a new class wrapping this hash map would introduce unnecessary indirection.
- when this feature is on the cleanup triggered during removing of executors (which is handled in `ExternalShuffleBlockResolver`) is modified to keep the disk persisted RDD blocks. This cleanup is triggered in standalone mode when the `spark.storage.cleanupFilesAfterExecutorExit` config is set.
- the unpersisting of an RDD is extended to use the external shuffle service for disk persisted RDD blocks when the original executor which created the blocks are already released. New block transport messages are introduced to support this: `RemoveBlocks` and `BlocksRemoved`.

Here the complete use case is tested by the "SPARK-25888: using external shuffle service fetching disk persisted blocks" with a tiny difference: here the executor is killed manually, this way the test is a bit faster than waiting for the idle timeout.

Tests the fetching of the RDD blocks via the external shuffle service.

This a new suite. As the `BlockManagerInfo` behaviour depends very much on whether the external shuffle service enabled or not all the tests are executed with and without it.

Tests the sorting of the block locations.

Spark App was:

~~~scala
package com.mycompany

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel

object TestAppDiskOnlyLevel {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test-app")

    println("Attila: START")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(0 until 100, 10)
      .map { i =>
        println(s"Attila: calculate first rdd i=$i")
        Thread.sleep(1000)
        i
      }

    rdd.persist(StorageLevel.DISK_ONLY)
    rdd.count()

    println("Attila: First RDD is processed, waiting for 60 sec")

    Thread.sleep(60 * 1000)

    println("Attila: Num executors must be 0 as executorIdleTimeout is way over")

    val rdd2 = sc.parallelize(0 until 10, 1)
      .map(i => (i, 1))
      .persist(StorageLevel.DISK_ONLY)

    rdd2.count()

    println("Attila: Second RDD with one partition (only one executors must be alive)")

    // reduce runs as user code to detect the empty seq (empty blocks)
    println("Calling collect on the first RDD: " + rdd.collect().reduce(_ + _))

    println("Attila: STOP")
  }
}
~~~

I have submitted with the following configuration:

~~~bash
spark-submit --master yarn \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.executorIdleTimeout=30 \
  --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90 \
  --class com.mycompany.TestAppDiskOnlyLevel dyn_alloc_demo-core_2.11-0.1.0-SNAPSHOT-jar-with-dependencies.jar
~~~

Checked the result by filtering for the side effect of the task calculations:

~~~bash
[userserver ~]$ yarn logs -applicationId application_1556299359453_0001 | grep "Attila: calculate" | wc -l
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
19/04/26 10:31:59 INFO client.RMProxy: Connecting to ResourceManager at apiros-1.gce.company.com/172.31.115.165:8032
100
~~~

So it is only 100 task execution and not 200 (which would be the case for re-computation).

Moreover from the submit/launcher log we can see executors really stopped in between (see the new total is 0 before the last line):
~~~
[userserver ~]$ grep "Attila: Num executors must be 0" -B 2 spark-submit.log
19/04/26 10:24:27 INFO cluster.YarnScheduler: Executor 9 on apiros-3.gce.company.com killed by driver.
19/04/26 10:24:27 INFO spark.ExecutorAllocationManager: Existing executor 9 has been removed (new total is 0)
Attila: Num executors must be 0 as executorIdleTimeout is way over
~~~

[Full spark submit log](https://github.com/attilapiros/spark/files/3122465/spark-submit.log)

I have done a test also after changing the `DISK_ONLY` storage level to `MEMORY_ONLY` for the first RDD. After this change during the 60sec waiting no executor was removed.

Closes apache#24499 from attilapiros/SPARK-25888-final.

Authored-by: “attilapiros” <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
prakharjain09 pushed a commit to prakharjain09/spark that referenced this pull request Nov 29, 2019
… service after releasing executor by dynamic allocation

An executor which has persisted blocks does not consider to be idle and this way ready to be released by dynamic allocation after the regular timeout `spark.dynamicAllocation.executorIdleTimeout` but there is separate configuration `spark.dynamicAllocation.cachedExecutorIdleTimeout` which defaults to `Integer.MAX_VALUE`. This is because releasing the executor also means losing the persisted blocks (as the metadata for individual blocks called `BlockInfo` are kept in memory) and when the RDD is referenced latter on this lost blocks will be recomputed.
On the other hand keeping the executors too long without any task to work on is also a waste of resources (as executors are reserved for the application by the resource manager).

This PR focuses on the first part of SPARK-25888: it extends the external shuffle service with the capability to serve RDD blocks which are persisted on the local disk store by the executors. Moreover when this feature is enabled by setting the `spark.shuffle.service.fetch.rdd.enabled` config to true and a block is reported to be persisted on to disk the external shuffle service instance running on the same host as the executor is also registered (along with the reporting block manager) as a possible location for fetching it.

Some explanation about the decisions made during the development:
- the location list to fetch a block was randomized but the groups (same host, same rack, others) order was kept. In this PR the order of groups are kept and external shuffle service added to the end of the each group.
- `BlockManagerInfo` is not introduced for external shuffle service but only a lightweight solution is taken. A hash map from `BlockId` to `BlockStatus` is introduced. A type alias would make the source more readable but I know it is discouraged. On the other hand a new class wrapping this hash map would introduce unnecessary indirection.
- when this feature is on the cleanup triggered during removing of executors (which is handled in `ExternalShuffleBlockResolver`) is modified to keep the disk persisted RDD blocks. This cleanup is triggered in standalone mode when the `spark.storage.cleanupFilesAfterExecutorExit` config is set.
- the unpersisting of an RDD is extended to use the external shuffle service for disk persisted RDD blocks when the original executor which created the blocks are already released. New block transport messages are introduced to support this: `RemoveBlocks` and `BlocksRemoved`.

Here the complete use case is tested by the "SPARK-25888: using external shuffle service fetching disk persisted blocks" with a tiny difference: here the executor is killed manually, this way the test is a bit faster than waiting for the idle timeout.

Tests the fetching of the RDD blocks via the external shuffle service.

This a new suite. As the `BlockManagerInfo` behaviour depends very much on whether the external shuffle service enabled or not all the tests are executed with and without it.

Tests the sorting of the block locations.

Spark App was:

~~~scala
package com.mycompany

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel

object TestAppDiskOnlyLevel {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test-app")

    println("Attila: START")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(0 until 100, 10)
      .map { i =>
        println(s"Attila: calculate first rdd i=$i")
        Thread.sleep(1000)
        i
      }

    rdd.persist(StorageLevel.DISK_ONLY)
    rdd.count()

    println("Attila: First RDD is processed, waiting for 60 sec")

    Thread.sleep(60 * 1000)

    println("Attila: Num executors must be 0 as executorIdleTimeout is way over")

    val rdd2 = sc.parallelize(0 until 10, 1)
      .map(i => (i, 1))
      .persist(StorageLevel.DISK_ONLY)

    rdd2.count()

    println("Attila: Second RDD with one partition (only one executors must be alive)")

    // reduce runs as user code to detect the empty seq (empty blocks)
    println("Calling collect on the first RDD: " + rdd.collect().reduce(_ + _))

    println("Attila: STOP")
  }
}
~~~

I have submitted with the following configuration:

~~~bash
spark-submit --master yarn \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.executorIdleTimeout=30 \
  --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90 \
  --class com.mycompany.TestAppDiskOnlyLevel dyn_alloc_demo-core_2.11-0.1.0-SNAPSHOT-jar-with-dependencies.jar
~~~

Checked the result by filtering for the side effect of the task calculations:

~~~bash
[userserver ~]$ yarn logs -applicationId application_1556299359453_0001 | grep "Attila: calculate" | wc -l
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
19/04/26 10:31:59 INFO client.RMProxy: Connecting to ResourceManager at apiros-1.gce.company.com/172.31.115.165:8032
100
~~~

So it is only 100 task execution and not 200 (which would be the case for re-computation).

Moreover from the submit/launcher log we can see executors really stopped in between (see the new total is 0 before the last line):
~~~
[userserver ~]$ grep "Attila: Num executors must be 0" -B 2 spark-submit.log
19/04/26 10:24:27 INFO cluster.YarnScheduler: Executor 9 on apiros-3.gce.company.com killed by driver.
19/04/26 10:24:27 INFO spark.ExecutorAllocationManager: Existing executor 9 has been removed (new total is 0)
Attila: Num executors must be 0 as executorIdleTimeout is way over
~~~

[Full spark submit log](https://github.com/attilapiros/spark/files/3122465/spark-submit.log)

I have done a test also after changing the `DISK_ONLY` storage level to `MEMORY_ONLY` for the first RDD. After this change during the 60sec waiting no executor was removed.

Closes apache#24499 from attilapiros/SPARK-25888-final.

Authored-by: “attilapiros” <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit e9f3f62)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants