-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25921][PySpark] Fix barrier task run without BarrierTaskContext while python worker reuse #22962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25921][PySpark] Fix barrier task run without BarrierTaskContext while python worker reuse #22962
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -614,6 +614,21 @@ def context_barrier(x): | |
| times = rdd.barrier().mapPartitions(f).map(context_barrier).collect() | ||
| self.assertTrue(max(times) - min(times) < 1) | ||
|
|
||
| def test_barrier_with_python_worker_reuse(self): | ||
| """ | ||
| Verify that BarrierTaskContext.barrier() with reused python worker. | ||
| """ | ||
| self.sc._conf.set("spark.python.work.reuse", "true") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xuanyuanking, this will probably need a separate test class since it's also related with how we start the worker or not. You can make a new class, run a simple job to make sure workers are created and being resued, test it and stop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do these 2 check like below:
Maybe this can prove the UT can cover the issue and also can reuse the original barrier case code, WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @HyukjinKwon Hi Hyukjin if you still think this need a separate class I'll think about the method of checking worker reuse and give a follow up PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup. sorry for late response. |
||
| rdd = self.sc.parallelize(range(4), 4) | ||
xuanyuanking marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # start a normal job first to start all worker | ||
| result = rdd.map(lambda x: x ** 2).collect() | ||
| self.assertEqual([0, 1, 4, 9], result) | ||
| # make sure `spark.python.work.reuse=true` | ||
| self.assertEqual(self.sc._conf.get("spark.python.work.reuse"), "true") | ||
|
|
||
| # worker will be reused in this barrier job | ||
| self.test_barrier() | ||
|
|
||
| def test_barrier_infos(self): | ||
| """ | ||
| Verify that BarrierTaskContext.getTaskInfos() returns a list of all task infos in the | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I think we should just
BarrierTaskContext(). Let's don't make it complicated next time.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good point! @xuanyuanking can you send a small followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the answer is no, maybe I was not clear enough in my previous explain #22962 (comment), use
BarrierTaskContext()here is my first commit 0cb2cf6 , it should also need to rewrite__new__forBarrierTaskContext, otherwise the bug still exists cause its parent classTaskContextrewrite__new__(), when we callBarrierTaskContext()here in a reused worker, aTaskContextinstance will be returned inTaskContext.__new__():spark/python/pyspark/taskcontext.py
Line 47 in c00e72f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add some comments to explain it? so that people won't get confused again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get rid of the rewrite all? It's never a good idea to overwrite them unless it's absolutely required.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we should remove
__init__too. That's what Python will implicitly insert for both.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, next time please fully describe what's going on in PR description. Even I was confused about it and misread that
__new__is actually being inherited.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sorry for the less explain and more comments should be done at first, will done in follow up PR.
@HyukjinKwon Sorry for the less explain, all these will be done in next follow up PR, and the new UT.