Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyspark/taskcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def __init__(self):
@classmethod
def _getOrCreate(cls):
"""Internal function to get or create global BarrierTaskContext."""
if cls._taskContext is None:
cls._taskContext = BarrierTaskContext()
if not isinstance(cls._taskContext, BarrierTaskContext):
cls._taskContext = object.__new__(cls)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think we should just BarrierTaskContext(). Let's don't make it complicated next time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point! @xuanyuanking can you send a small followup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer is no, maybe I was not clear enough in my previous explain #22962 (comment), use BarrierTaskContext() here is my first commit 0cb2cf6 , it should also need to rewrite __new__ for BarrierTaskContext, otherwise the bug still exists cause its parent class TaskContext rewrite __new__(), when we call BarrierTaskContext() here in a reused worker, a TaskContext instance will be returned in TaskContext.__new__():

return taskContext

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add some comments to explain it? so that people won't get confused again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of the rewrite all? It's never a good idea to overwrite them unless it's absolutely required.

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should remove __init__ too. That's what Python will implicitly insert for both.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, next time please fully describe what's going on in PR description. Even I was confused about it and misread that __new__ is actually being inherited.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add some comments to explain it?

@cloud-fan Sorry for the less explain and more comments should be done at first, will done in follow up PR.

Can we get rid of the rewrite all?
we should remove __init__ too
next time please fully describe what's going on in PR description

@HyukjinKwon Sorry for the less explain, all these will be done in next follow up PR, and the new UT.

return cls._taskContext

@classmethod
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,21 @@ def context_barrier(x):
times = rdd.barrier().mapPartitions(f).map(context_barrier).collect()
self.assertTrue(max(times) - min(times) < 1)

def test_barrier_with_python_worker_reuse(self):
"""
Verify that BarrierTaskContext.barrier() with reused python worker.
"""
self.sc._conf.set("spark.python.work.reuse", "true")
Copy link
Member

@HyukjinKwon HyukjinKwon Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking, this will probably need a separate test class since it's also related with how we start the worker or not. You can make a new class, run a simple job to make sure workers are created and being resued, test it and stop.

Copy link
Member Author

@xuanyuanking xuanyuanking Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do these 2 check like below:

  1. Run this test case without fix in BarrierTaskContext._getOrCreate, the bug can be reproduced.
  2. Same code running in pyspark shell and set spark.python.work.resue=false, it return successfully.

Maybe this can prove the UT can cover the issue and also can reuse the original barrier case code, WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Hi Hyukjin if you still think this need a separate class I'll think about the method of checking worker reuse and give a follow up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. sorry for late response.

rdd = self.sc.parallelize(range(4), 4)
# start a normal job first to start all worker
result = rdd.map(lambda x: x ** 2).collect()
self.assertEqual([0, 1, 4, 9], result)
# make sure `spark.python.work.reuse=true`
self.assertEqual(self.sc._conf.get("spark.python.work.reuse"), "true")

# worker will be reused in this barrier job
self.test_barrier()

def test_barrier_infos(self):
"""
Verify that BarrierTaskContext.getTaskInfos() returns a list of all task infos in the
Expand Down