Skip to content

Conversation

biaoma-ty
Copy link

What changes were proposed in this pull request?

Added a pre-fetch mechanism for shuffle stage.
The map stage would load the blocks before the openBlock message arrives, then while the block data were transferred through the net, disks would no longer has nothing to do, they would provide the data and let the server load to memory.
This would make the time between the server received openBlock message and ready to transfer the data even shorter.

How was this patch tested?

After setting the params:spark.shuffle.prepare.count > 0 & spark.shuffle.prepare.open to be true, this mechanism will work.

@vanzin
Copy link
Contributor

vanzin commented Aug 4, 2016

ok to test

@SparkQA
Copy link

SparkQA commented Aug 4, 2016

Test build #63235 has finished for PR 14239 at commit cb5e907.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64207 has finished for PR 14239 at commit 4d90661.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@biaoma-ty biaoma-ty changed the title [SPARK-16593] [CORE] Provide a pre-fetch mechanism to accelerate shuffle stage. [SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage. Aug 22, 2016
@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64210 has finished for PR 14239 at commit 5e93297.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64212 has finished for PR 14239 at commit b49be73.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64214 has finished for PR 14239 at commit c97c12f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2016

Test build #64258 has finished for PR 14239 at commit 235b08a.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2016

Test build #64275 has finished for PR 14239 at commit 09ab278.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2016

Test build #64277 has finished for PR 14239 at commit 32c63be.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

This sounds interesting, I haven't looked at the code, but I have some questions/concerns. Could you perhaps give some more description clarification.

Are you saying that you are loading all the data for all the maps from disk into memory and caching it waiting for the reducer to fetch it? If so this may work ok for small data but very quickly you would run out of memory for large data. Especially if say the YARN nodemanager is running the shuffle handler. many people run nodemanagers with only 1-2 GB of memory.

does it conditionally do this or always do it?

How exactly does the timing work on this, aren't you going to send the prepare immediately before sending the fetch? does the fetch block on waiting on the prepare to cache the data?

what testing have you done with this and what size of data? What type of load was on the nodes when testing, etc?

Note that in many cases if you have enough free memory for the OS (atleast with linux), the data won't be read from disk anyway, it will still be in the page cache. Now if the box doesn't have enough free memory (data to large or other apps using it), that would be pushed out and you would have to read from disk. Here you have the same problem though if you are going to read it back into memory, you have to make sure your process has enough memory to store it, which can be huge.

I have been looking at adding a readahead to the shufflehandler that uses the os fadvise on the file, this allows the os to read it before its needed and increases disk throughput. MapReduce has this functionality and it helps a lot there, I have a test branch on spark but still need to finish evaluating its performance. The other part of this is exactly how to integrate it in because it uses native code from Hadoop.

@SparkQA
Copy link

SparkQA commented Aug 25, 2016

Test build #64427 has finished for PR 14239 at commit 190d7fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@biaoma-ty
Copy link
Author

biaoma-ty commented Aug 26, 2016

@tgravescs Thanks a lot.
1. Are you saying that you are loading all the data for all the maps from disk into memory and caching it waiting for the reducer to fetch it?
2. does it conditionally do this or always do it?

I use parameters spark.shuffle.prepare.open to switch this mechanism off/on and spark.shuffle.prepare.count to control the block number to cache. So here gives the user the privilege to control the MEM used for the pre-fetch block based on their machine conditions.

3. How exactly does the timing work on this, aren't you going to send the prepare immediately before sending the fetch? does the fetch block on waiting on the prepare to cache the data?

I changed the logistic of the shuffle message transfer process, each time I send a FetchRequest, I'll also send the next, so here the server side would eaxctly know the blockIds for the next fetch loop, then cache them, on the FetchRequest succeed callback, the cache would be released since all of them had send to the map side and no longer be used.When the PrepareRequest arrived, the server get a thread from the threadpool to operate the read request(In fact, I use a FutureTask to do this), if the FetchRequest arrived , since the data has not been cached fully yet, this req would be blocked like before and also more effcient than before while the data has been load to mem before the req actually arrive.

4. what testing have you done with this and what size of data? What type of load was on the nodes when testing, etc?

I have implement this and tested based on the branch 1.4 and 1.6, using Intel Hibench4.0 terasort 1TB data size, I got about 30% performance enhancements, on a cluster which has 5 node, each node has 96GB Mem,CPU is Xeon E5 v3 , 7200RPM Disk.

But note that since Benchmark like terasort would shuffle all the data that has been read, so in other cases, it may not work so well as that.

Not long before I' ve had 5 node to test it, but for now I don't have the physical machine that make it not quite convienience for me.

5. Some concern about OS cache

OS cache may do not have much impact on this(If my understanding is wrong, please correct me, thanks), since the shuffle block produced by map side will not be read more than one time in a normal job. Once the shuffle block consumed by the reduce side, it would be of no use, so once the shuffle blocks' size is too huge for mem buffer, they will be flushed to disk and not reload to mem before DiskManager read them. If there is enough memory, this would not make the reading process more slow, and if not, we can use the limited memory to pre load the data. While transfer process succeed, release the mem buffer to load the data the next FetchRequest contains, until all the data has been send to the reduce side.

6. Some way to make it more efficient.

Here we may search some paper and refer to them to make it more consummate . e.g. ‘HPMR: Prefetching and pre-shuffling in shared MapReduce computation environment ’

Thanks for your feedback, any work you want me to co-operate would be my pleasure, I love Spark so much.

@tgravescs
Copy link
Contributor

thanks for the explanation, this makes much more sense now. I'm still a bit concerned about the memory usage of this though, especially with external shuffle on the nodemanager.

Were you using the external shuffle to test this or just the shuffle built into the executors? How much memory did you give whatever was shuffling and how big were the blocks being fetched?

Does this look at all about the size its trying to cache vs size available to shuffle handler?

@zdove
Copy link

zdove commented Mar 6, 2017

Hi, I am confused about the code. I don't understand where and how spark read your spark.shuffle.prepare.open parameter. If you set it in spark-default.conf, then which program sepecificly read the parameter to help cache the block?
Hope to see your reply.

@biaoma-ty
Copy link
Author

@zdove please refer to org.apache.spark.SparkConf, spark load all parameter you set on the file spark-defaults by scripts when JVM process start and filter those contains "spark", then add them to SparkConf.

@biaoma-ty
Copy link
Author

@tgravescs Sorry for so late to reply, I am really sorry for that.
I haven't use external shuffle, I need to rebase the code to recent release and retest it. I‘ll rebase it on executor shuffle first, then if it still has some effect, then I'd like to add external shuffle service support.

@zdove
Copy link

zdove commented Mar 9, 2017

Thank you for your reply.
I consider that the code you have changed don't involve about SparkConf. If you don't change the default SparkConf file then Spark will not recognize the parameter spark.shuffle.prepare.open, am I right?

And even after Spark recognize the spark.shuffle.prepare.open you set, where and when will it send this message to shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java PREPARE_BLOCKS(6) ?

@jiangxb1987
Copy link
Contributor

Is this still WIP? @f7753

@HyukjinKwon
Copy link
Member

Is it @f7753 ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants