-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27677][Core] Serve local disk persisted blocks by the external service after releasing executor by dynamic allocation #24499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, I didn't look at the tests yet.
The one thing that I noticed is that the logic here is based on where the storage level says the block may be stored on disk, not whether the block is actually stored on disk.
Wouldn't this break (as in you'd lose cached data) if you have MEMORY_AND_DISK persistence, but a particular block is currently just sitting in memory?
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/BlockTransferClientSync.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/BlockTransferClientSync.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
|
Test build #105037 has finished for PR 24499 at commit
|
|
@vanzin Thanks for the review! Yes, the storage level is used in two contexts:
I have checked and in |
|
Test build #105041 has finished for PR 24499 at commit
|
|
Test build #105045 has finished for PR 24499 at commit
|
|
@srowen Could it be something off with the Java style tests? That error reported in the 2nd commit was even present in my 1st commit (a space before the |
|
Test build #105048 has finished for PR 24499 at commit
|
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/SyncBlockTransferClient.scala
Outdated
Show resolved
Hide resolved
|
Test build #105069 has finished for PR 24499 at commit
|
|
Test build #105089 has finished for PR 24499 at commit
|
core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #105095 has finished for PR 24499 at commit
|
|
The last commit is tested manually (in standalone mode): And there were no block recalculation: |
|
Test build #105104 has finished for PR 24499 at commit
|
|
Test build #105109 has finished for PR 24499 at commit
|
|
Jenkins retest this please Failure unrelated, locally it was running fine (although by python2.7): |
|
Jenkins retest this please |
|
Test build #105114 has finished for PR 24499 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some small things.
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #105161 has finished for PR 24499 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a really brief review so far
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #105217 has finished for PR 24499 at commit
|
|
Test build #105226 has finished for PR 24499 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still need to go through tests, but implementation makes sense to me
...src/test/java/org/apache/spark/network/shuffle/CleanupNonShuffleServiceServedFilesSuite.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
...etwork-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
Show resolved
Hide resolved
|
Jenkins, retest this please |
|
Test build #105232 has finished for PR 24499 at commit
|
|
I think it is important to mention why in the previous commit the Line 103 in dfeeda2
Without this settings the runtime of the test with the corrupt file (ExternalShuffleIntegrationSuite#testFetchCorruptRddBlock) increases dramatically (with 0 retries it is only takes 0.2 seconds but with 3 retries it goes up to 15sec). I think it is because the error is detected at a very deep level within Netty and the channel is closed right here: Lines 131 to 133 in cc7aea0
So not the quick |
|
moving the discussion about file deletion and null buffers to the top-level so it doesn't get folded on code updates:
Just to understand what you're protecting against here -- is there an expected path where the file gets deleted? Or are you just trying to have the behavior be a little more understandable when something bad happens on the host and the file goes missing? |
|
There is no code path that I know of where the files are deleted. I just try to make this as robust as possible (considering of course the price of this robustness) and I would like to know in advance how it will behave if something similar happens. This is why I added the new test. |
cac5d71 to
e3adc05
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more nits. I need to do another more careful pass on the whole patch.
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/Constant.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/spark/network/shuffle/CleanupNonShuffleServiceServedFilesSuite.java
Outdated
Show resolved
Hide resolved
...n/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #105638 has finished for PR 24499 at commit
|
|
Test build #105680 has finished for PR 24499 at commit
|
|
Jenkins retest this please |
1 similar comment
|
Jenkins retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just a small test nit (and a revert of a previous suggestion).
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/spark/network/shuffle/CleanupNonShuffleServiceServedFilesSuite.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #105700 has finished for PR 24499 at commit
|
|
Test build #105720 has finished for PR 24499 at commit
|
|
Alright, good to go. Merging to master. |
| public boolean equals(Object other) { | ||
| if (other != null && other instanceof BlocksRemoved) { | ||
| BlocksRemoved o = (BlocksRemoved) other; | ||
| return Objects.equal(numRemovedBlocks, o.numRemovedBlocks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we compare 2 ints with Objects.equal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDEs are usually good at generating equals and hashCode for java classes, maybe we can use the generated version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right (I followed the pattern used within the same package like in ExecutorShuffleInfo).
I can open a minor PR with fixing these two or I can add it this tiny change into my next PR which might be opened next week or the week after. It is about avoiding the network at fetching shuffle blocks from the block manager running on the same host, so it is just loosely related.
Which one is preferred by you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both are fine, this is really trivial
… service after releasing executor by dynamic allocation
An executor which has persisted blocks does not consider to be idle and this way ready to be released by dynamic allocation after the regular timeout `spark.dynamicAllocation.executorIdleTimeout` but there is separate configuration `spark.dynamicAllocation.cachedExecutorIdleTimeout` which defaults to `Integer.MAX_VALUE`. This is because releasing the executor also means losing the persisted blocks (as the metadata for individual blocks called `BlockInfo` are kept in memory) and when the RDD is referenced latter on this lost blocks will be recomputed.
On the other hand keeping the executors too long without any task to work on is also a waste of resources (as executors are reserved for the application by the resource manager).
This PR focuses on the first part of SPARK-25888: it extends the external shuffle service with the capability to serve RDD blocks which are persisted on the local disk store by the executors. Moreover when this feature is enabled by setting the `spark.shuffle.service.fetch.rdd.enabled` config to true and a block is reported to be persisted on to disk the external shuffle service instance running on the same host as the executor is also registered (along with the reporting block manager) as a possible location for fetching it.
Some explanation about the decisions made during the development:
- the location list to fetch a block was randomized but the groups (same host, same rack, others) order was kept. In this PR the order of groups are kept and external shuffle service added to the end of the each group.
- `BlockManagerInfo` is not introduced for external shuffle service but only a lightweight solution is taken. A hash map from `BlockId` to `BlockStatus` is introduced. A type alias would make the source more readable but I know it is discouraged. On the other hand a new class wrapping this hash map would introduce unnecessary indirection.
- when this feature is on the cleanup triggered during removing of executors (which is handled in `ExternalShuffleBlockResolver`) is modified to keep the disk persisted RDD blocks. This cleanup is triggered in standalone mode when the `spark.storage.cleanupFilesAfterExecutorExit` config is set.
- the unpersisting of an RDD is extended to use the external shuffle service for disk persisted RDD blocks when the original executor which created the blocks are already released. New block transport messages are introduced to support this: `RemoveBlocks` and `BlocksRemoved`.
Here the complete use case is tested by the "SPARK-25888: using external shuffle service fetching disk persisted blocks" with a tiny difference: here the executor is killed manually, this way the test is a bit faster than waiting for the idle timeout.
Tests the fetching of the RDD blocks via the external shuffle service.
This a new suite. As the `BlockManagerInfo` behaviour depends very much on whether the external shuffle service enabled or not all the tests are executed with and without it.
Tests the sorting of the block locations.
Spark App was:
~~~scala
package com.mycompany
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
object TestAppDiskOnlyLevel {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test-app")
println("Attila: START")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(0 until 100, 10)
.map { i =>
println(s"Attila: calculate first rdd i=$i")
Thread.sleep(1000)
i
}
rdd.persist(StorageLevel.DISK_ONLY)
rdd.count()
println("Attila: First RDD is processed, waiting for 60 sec")
Thread.sleep(60 * 1000)
println("Attila: Num executors must be 0 as executorIdleTimeout is way over")
val rdd2 = sc.parallelize(0 until 10, 1)
.map(i => (i, 1))
.persist(StorageLevel.DISK_ONLY)
rdd2.count()
println("Attila: Second RDD with one partition (only one executors must be alive)")
// reduce runs as user code to detect the empty seq (empty blocks)
println("Calling collect on the first RDD: " + rdd.collect().reduce(_ + _))
println("Attila: STOP")
}
}
~~~
I have submitted with the following configuration:
~~~bash
spark-submit --master yarn \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.executorIdleTimeout=30 \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90 \
--class com.mycompany.TestAppDiskOnlyLevel dyn_alloc_demo-core_2.11-0.1.0-SNAPSHOT-jar-with-dependencies.jar
~~~
Checked the result by filtering for the side effect of the task calculations:
~~~bash
[userserver ~]$ yarn logs -applicationId application_1556299359453_0001 | grep "Attila: calculate" | wc -l
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
19/04/26 10:31:59 INFO client.RMProxy: Connecting to ResourceManager at apiros-1.gce.company.com/172.31.115.165:8032
100
~~~
So it is only 100 task execution and not 200 (which would be the case for re-computation).
Moreover from the submit/launcher log we can see executors really stopped in between (see the new total is 0 before the last line):
~~~
[userserver ~]$ grep "Attila: Num executors must be 0" -B 2 spark-submit.log
19/04/26 10:24:27 INFO cluster.YarnScheduler: Executor 9 on apiros-3.gce.company.com killed by driver.
19/04/26 10:24:27 INFO spark.ExecutorAllocationManager: Existing executor 9 has been removed (new total is 0)
Attila: Num executors must be 0 as executorIdleTimeout is way over
~~~
[Full spark submit log](https://github.com/attilapiros/spark/files/3122465/spark-submit.log)
I have done a test also after changing the `DISK_ONLY` storage level to `MEMORY_ONLY` for the first RDD. After this change during the 60sec waiting no executor was removed.
Closes apache#24499 from attilapiros/SPARK-25888-final.
Authored-by: “attilapiros” <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
… service after releasing executor by dynamic allocation
An executor which has persisted blocks does not consider to be idle and this way ready to be released by dynamic allocation after the regular timeout `spark.dynamicAllocation.executorIdleTimeout` but there is separate configuration `spark.dynamicAllocation.cachedExecutorIdleTimeout` which defaults to `Integer.MAX_VALUE`. This is because releasing the executor also means losing the persisted blocks (as the metadata for individual blocks called `BlockInfo` are kept in memory) and when the RDD is referenced latter on this lost blocks will be recomputed.
On the other hand keeping the executors too long without any task to work on is also a waste of resources (as executors are reserved for the application by the resource manager).
This PR focuses on the first part of SPARK-25888: it extends the external shuffle service with the capability to serve RDD blocks which are persisted on the local disk store by the executors. Moreover when this feature is enabled by setting the `spark.shuffle.service.fetch.rdd.enabled` config to true and a block is reported to be persisted on to disk the external shuffle service instance running on the same host as the executor is also registered (along with the reporting block manager) as a possible location for fetching it.
Some explanation about the decisions made during the development:
- the location list to fetch a block was randomized but the groups (same host, same rack, others) order was kept. In this PR the order of groups are kept and external shuffle service added to the end of the each group.
- `BlockManagerInfo` is not introduced for external shuffle service but only a lightweight solution is taken. A hash map from `BlockId` to `BlockStatus` is introduced. A type alias would make the source more readable but I know it is discouraged. On the other hand a new class wrapping this hash map would introduce unnecessary indirection.
- when this feature is on the cleanup triggered during removing of executors (which is handled in `ExternalShuffleBlockResolver`) is modified to keep the disk persisted RDD blocks. This cleanup is triggered in standalone mode when the `spark.storage.cleanupFilesAfterExecutorExit` config is set.
- the unpersisting of an RDD is extended to use the external shuffle service for disk persisted RDD blocks when the original executor which created the blocks are already released. New block transport messages are introduced to support this: `RemoveBlocks` and `BlocksRemoved`.
Here the complete use case is tested by the "SPARK-25888: using external shuffle service fetching disk persisted blocks" with a tiny difference: here the executor is killed manually, this way the test is a bit faster than waiting for the idle timeout.
Tests the fetching of the RDD blocks via the external shuffle service.
This a new suite. As the `BlockManagerInfo` behaviour depends very much on whether the external shuffle service enabled or not all the tests are executed with and without it.
Tests the sorting of the block locations.
Spark App was:
~~~scala
package com.mycompany
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
object TestAppDiskOnlyLevel {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test-app")
println("Attila: START")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(0 until 100, 10)
.map { i =>
println(s"Attila: calculate first rdd i=$i")
Thread.sleep(1000)
i
}
rdd.persist(StorageLevel.DISK_ONLY)
rdd.count()
println("Attila: First RDD is processed, waiting for 60 sec")
Thread.sleep(60 * 1000)
println("Attila: Num executors must be 0 as executorIdleTimeout is way over")
val rdd2 = sc.parallelize(0 until 10, 1)
.map(i => (i, 1))
.persist(StorageLevel.DISK_ONLY)
rdd2.count()
println("Attila: Second RDD with one partition (only one executors must be alive)")
// reduce runs as user code to detect the empty seq (empty blocks)
println("Calling collect on the first RDD: " + rdd.collect().reduce(_ + _))
println("Attila: STOP")
}
}
~~~
I have submitted with the following configuration:
~~~bash
spark-submit --master yarn \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.executorIdleTimeout=30 \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90 \
--class com.mycompany.TestAppDiskOnlyLevel dyn_alloc_demo-core_2.11-0.1.0-SNAPSHOT-jar-with-dependencies.jar
~~~
Checked the result by filtering for the side effect of the task calculations:
~~~bash
[userserver ~]$ yarn logs -applicationId application_1556299359453_0001 | grep "Attila: calculate" | wc -l
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
19/04/26 10:31:59 INFO client.RMProxy: Connecting to ResourceManager at apiros-1.gce.company.com/172.31.115.165:8032
100
~~~
So it is only 100 task execution and not 200 (which would be the case for re-computation).
Moreover from the submit/launcher log we can see executors really stopped in between (see the new total is 0 before the last line):
~~~
[userserver ~]$ grep "Attila: Num executors must be 0" -B 2 spark-submit.log
19/04/26 10:24:27 INFO cluster.YarnScheduler: Executor 9 on apiros-3.gce.company.com killed by driver.
19/04/26 10:24:27 INFO spark.ExecutorAllocationManager: Existing executor 9 has been removed (new total is 0)
Attila: Num executors must be 0 as executorIdleTimeout is way over
~~~
[Full spark submit log](https://github.com/attilapiros/spark/files/3122465/spark-submit.log)
I have done a test also after changing the `DISK_ONLY` storage level to `MEMORY_ONLY` for the first RDD. After this change during the 60sec waiting no executor was removed.
Closes apache#24499 from attilapiros/SPARK-25888-final.
Authored-by: “attilapiros” <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit e9f3f62)
What changes were proposed in this pull request?
Problem statement
An executor which has persisted blocks does not consider to be idle and this way ready to be released by dynamic allocation after the regular timeout
spark.dynamicAllocation.executorIdleTimeoutbut there is separate configurationspark.dynamicAllocation.cachedExecutorIdleTimeoutwhich defaults toInteger.MAX_VALUE. This is because releasing the executor also means losing the persisted blocks (as the metadata for individual blocks calledBlockInfoare kept in memory) and when the RDD is referenced latter on this lost blocks will be recomputed.On the other hand keeping the executors too long without any task to work on is also a waste of resources (as executors are reserved for the application by the resource manager).
Solution
This PR focuses on the first part of SPARK-25888: it extends the external shuffle service with the capability to serve RDD blocks which are persisted on the local disk store by the executors. Moreover when this feature is enabled by setting the
spark.shuffle.service.fetch.rdd.enabledconfig to true and a block is reported to be persisted on to disk the external shuffle service instance running on the same host as the executor is also registered (along with the reporting block manager) as a possible location for fetching it.Some implementation detail
Some explanation about the decisions made during the development:
BlockManagerInfois not introduced for external shuffle service but only a lightweight solution is taken. A hash map fromBlockIdtoBlockStatusis introduced. A type alias would make the source more readable but I know it is discouraged. On the other hand a new class wrapping this hash map would introduce unnecessary indirection.ExternalShuffleBlockResolver) is modified to keep the disk persisted RDD blocks. This cleanup is triggered in standalone mode when thespark.storage.cleanupFilesAfterExecutorExitconfig is set.RemoveBlocksandBlocksRemoved.How was this patch tested?
Unit tests
ExternalShuffleServiceSuite
Here the complete use case is tested by the "SPARK-25888: using external shuffle service fetching disk persisted blocks" with a tiny difference: here the executor is killed manually, this way the test is a bit faster than waiting for the idle timeout.
ExternalShuffleBlockHandlerSuite
Tests the fetching of the RDD blocks via the external shuffle service.
BlockManagerInfoSuite
This a new suite. As the
BlockManagerInfobehaviour depends very much on whether the external shuffle service enabled or not all the tests are executed with and without it.BlockManagerSuite
Tests the sorting of the block locations.
Manually on YARN
Spark App was:
I have submitted with the following configuration:
Checked the result by filtering for the side effect of the task calculations:
So it is only 100 task execution and not 200 (which would be the case for re-computation).
Moreover from the submit/launcher log we can see executors really stopped in between (see the new total is 0 before the last line):
Full spark submit log
I have done a test also after changing the
DISK_ONLYstorage level toMEMORY_ONLYfor the first RDD. After this change during the 60sec waiting no executor was removed.