-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads #17694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ts when using multiple threads
Please remove the boilerplate message at the end of your PR description. ok to test |
I tested your patch in our environment. Problem still exists.
|
The funny thing is this code works for me on 4 threads and throws exception on 10 threads |
Hi @maver1ck, Thanks for your time in testing the patch. I did run the patch with 1000 threads and it works fine. Please check the jira for the log file. Can you please check /grid/3/hadoop/yarn/log/usercache/bi/appcache/application_1492634694033_0092/container_e538_1492634694033_0092_01_000003/pyspark.zip/pyspark/rdd.py whether the changes are available. My guess is you might be missing changes in psypark.zip. Can you also let me know the steps you followed to test the patch. |
I checked pyspark.zip of running container and everything is on its place. I'll try to prepare example of the problem. |
OK. I did additional tests. |
Thanks for testing @maver1ck. I will look into 1.6 and 2.0.2. |
@vundela |
Filed a PR for fixing the issue in spark1.6 branch. |
This seems reasonable to me. There are some typos in the PR description. I think you meant "pickled" instead of "picked" in a few places. Using threading.Lock seems okay here from my admittedly limited understanding of the deep details of Python, and my reading of https://docs.python.org/2/library/threading.html#lock-objects. @vundela and I chatted off thread about this some. The precise race is this: the call to _wrap_function will define a number of broadcast variables. In the time between when the _wrap_function call finishes and self.ctx._jvm.PythonRDD executes, the RDD itself can be modified, perhaps changing broadcast variables and introducing the "Broadcast variable '%s' not loaded!" exception. My understanding is that, due to the Global Interpreter Lock, this lock will cause all other execution to cease while this block of code runs, implicitly preventing any races. This is a very coarse grained lock for this action but it is as good as we can get. (Someone please correct me if I’m wrong here.) It would be good if the PR description captured some of the above discussion. |
cc @holdenk Can you please let me know your comments? |
Interesting, I don't think I have all of the required context to review, but I'll try and take a look this weekend (I've got some other things happening this week already). |
Thanks for your time @holdenk. |
ping @holdenk, checking if you have some time to review this. |
ok to test |
Test build #77800 has finished for PR 17694 at commit
|
Hi @vundela , thanks for your PR. I was able to reproduce your issue, but I think your fix here uses too broad of lock, since the shared resource is the SparkContext pickle registry and needs to be locked only while the python command is being pickled. Please see #18695 and if you wouldn't mind checking if that solves your issue as well. Thanks! |
What changes were proposed in this pull request?
In pyspark when multiple threads are used, broadcast variables are pickled with wrong PythonRDD wrap functions which leads to the following exception(Because of the race condition between the threads on java side with py4j).
16/01/08 17:10:20 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 9)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/broadcast.py", line 39, in _from_id
raise Exception("Broadcast variable '%s' not loaded!" % bid)
Exception: (Exception("Broadcast variable '6' not loaded!",), <function _from_id at 0xce7a28>, (6L,))
This change will fix the race condition by making sure that broadcast variables are pickled with same pythonRDD function.
How was this patch tested?
Please review http://spark.apache.org/contributing.html before opening a pull request.