-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage. #14239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ok to test |
Test build #63235 has finished for PR 14239 at commit
|
Test build #64207 has finished for PR 14239 at commit
|
Test build #64210 has finished for PR 14239 at commit
|
Test build #64212 has finished for PR 14239 at commit
|
Test build #64214 has finished for PR 14239 at commit
|
Test build #64258 has finished for PR 14239 at commit
|
Test build #64275 has finished for PR 14239 at commit
|
Test build #64277 has finished for PR 14239 at commit
|
This sounds interesting, I haven't looked at the code, but I have some questions/concerns. Could you perhaps give some more description clarification. Are you saying that you are loading all the data for all the maps from disk into memory and caching it waiting for the reducer to fetch it? If so this may work ok for small data but very quickly you would run out of memory for large data. Especially if say the YARN nodemanager is running the shuffle handler. many people run nodemanagers with only 1-2 GB of memory. does it conditionally do this or always do it? How exactly does the timing work on this, aren't you going to send the prepare immediately before sending the fetch? does the fetch block on waiting on the prepare to cache the data? what testing have you done with this and what size of data? What type of load was on the nodes when testing, etc? Note that in many cases if you have enough free memory for the OS (atleast with linux), the data won't be read from disk anyway, it will still be in the page cache. Now if the box doesn't have enough free memory (data to large or other apps using it), that would be pushed out and you would have to read from disk. Here you have the same problem though if you are going to read it back into memory, you have to make sure your process has enough memory to store it, which can be huge. I have been looking at adding a readahead to the shufflehandler that uses the os fadvise on the file, this allows the os to read it before its needed and increases disk throughput. MapReduce has this functionality and it helps a lot there, I have a test branch on spark but still need to finish evaluating its performance. The other part of this is exactly how to integrate it in because it uses native code from Hadoop. |
Test build #64427 has finished for PR 14239 at commit
|
@tgravescs Thanks a lot. I use parameters 3. How exactly does the timing work on this, aren't you going to send the prepare immediately before sending the fetch? does the fetch block on waiting on the prepare to cache the data? I changed the logistic of the shuffle message transfer process, each time I send a FetchRequest, I'll also send the next, so here the server side would eaxctly know the blockIds for the next fetch loop, then cache them, on the FetchRequest succeed callback, the cache would be released since all of them had send to the map side and no longer be used.When the 4. what testing have you done with this and what size of data? What type of load was on the nodes when testing, etc? I have implement this and tested based on the branch 1.4 and 1.6, using Intel Hibench4.0 terasort 1TB data size, I got about 30% performance enhancements, on a cluster which has 5 node, each node has 96GB Mem,CPU is Xeon E5 v3 , 7200RPM Disk. But note that since Benchmark like terasort would shuffle all the data that has been read, so in other cases, it may not work so well as that. Not long before I' ve had 5 node to test it, but for now I don't have the physical machine that make it not quite convienience for me. 5. Some concern about OS cache OS cache may do not have much impact on this(If my understanding is wrong, please correct me, thanks), since the shuffle block produced by map side will not be read more than one time in a normal job. Once the shuffle block consumed by the reduce side, it would be of no use, so once the shuffle blocks' size is too huge for mem buffer, they will be flushed to disk and not reload to mem before DiskManager read them. If there is enough memory, this would not make the reading process more slow, and if not, we can use the limited memory to pre load the data. While transfer process succeed, release the mem buffer to load the data the next 6. Some way to make it more efficient. Here we may search some paper and refer to them to make it more consummate . e.g. ‘HPMR: Prefetching and pre-shuffling in shared MapReduce computation environment ’ Thanks for your feedback, any work you want me to co-operate would be my pleasure, I love Spark so much. |
thanks for the explanation, this makes much more sense now. I'm still a bit concerned about the memory usage of this though, especially with external shuffle on the nodemanager. Were you using the external shuffle to test this or just the shuffle built into the executors? How much memory did you give whatever was shuffling and how big were the blocks being fetched? Does this look at all about the size its trying to cache vs size available to shuffle handler? |
Hi, I am confused about the code. I don't understand where and how spark read your spark.shuffle.prepare.open parameter. If you set it in spark-default.conf, then which program sepecificly read the parameter to help cache the block? |
@zdove please refer to |
@tgravescs Sorry for so late to reply, I am really sorry for that. |
Thank you for your reply. And even after Spark recognize the spark.shuffle.prepare.open you set, where and when will it send this message to shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java PREPARE_BLOCKS(6) ? |
Is this still WIP? @f7753 |
Is it @f7753 ? |
What changes were proposed in this pull request?
Added a pre-fetch mechanism for shuffle stage.
The map stage would load the blocks before the openBlock message arrives, then while the block data were transferred through the net, disks would no longer has nothing to do, they would provide the data and let the server load to memory.
This would make the time between the server received openBlock message and ready to transfer the data even shorter.
How was this patch tested?
After setting the params:
spark.shuffle.prepare.count
> 0 &spark.shuffle.prepare.open
to betrue
, this mechanism will work.