Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 24 additions & 25 deletions lib/ClusterShell/Worker/Worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,22 @@ def _eh_sigspec_invoke_compat(method, argc_legacy, *args):
"""
argc_actual = len(getfullargspec(method)[0])
if argc_actual == argc_legacy:
# Use legacy signature (1.x)
# Use legacy signature (1.x) deprecated as of 1.9
warnings.warn("%s should use new %s() signature" % (method.__self__,
method.__name__),
DeprecationWarning)
return method(*args[0:argc_legacy - 1])
else:
# Assume new signature (2.x)
return method(*args)

def _eh_sigspec_ev_read_17(ev_read):
"""Helper function to check whether ev_read has the old 1.7 signature."""
return len(getfullargspec(ev_read)[0]) == 2
if len(getfullargspec(ev_read)[0]) == 2:
warnings.warn("%s should use new ev_read() signature" % \
ev_read.__self__, DeprecationWarning)
return True
return False


class WorkerException(Exception):
Expand Down Expand Up @@ -121,11 +128,17 @@ def __init__(self, handler):
self.metarefcnt = 0

# current_x public variables (updated at each event accordingly)
self.current_node = None #: set to node in event handler
self.current_msg = None #: set to stdout message in event handler
self.current_errmsg = None #: set to stderr message in event handler
self.current_rc = 0 #: set to return code in event handler
self.current_sname = None #: set to stream name in event handler

#: set to node in event handler; DEPRECATED: use :class:`.EventHandler` method argument **node**
self.current_node = None
#: set to stdout in event handler; DEPRECATED: use :class:`.EventHandler` method argument **msg** if ``sname==SNAME_STDOUT``
self.current_msg = None
#: set to stderr message in event handler; DEPRECATED: use :class:`.EventHandler` method argument **msg** if ``sname==SNAME_STDERR``
self.current_errmsg = None
#: set to return code in event handler; DEPRECATED: use :class:`.EventHandler` method argument **rc**
self.current_rc = 0
#: set to stream name in event handler; DEPRECATED: use :class:`.EventHandler` method argument **sname**
self.current_sname = None

def _set_task(self, task):
"""Bind worker to task. Called by task.schedule()."""
Expand Down Expand Up @@ -178,25 +191,10 @@ def _on_written(self, key, bytes_count, sname):
self.current_sname = sname

if self.eh is not None:
_eh_sigspec_invoke_compat(self.eh.ev_written, 5, self, key, sname,
bytes_count)
self.eh.ev_written(self, key, sname, bytes_count)

# Base getters

def last_read(self):
"""
Get last read message from event handler.
[DEPRECATED] use current_msg
"""
raise NotImplementedError("Derived classes must implement.")

def last_error(self):
"""
Get last error message from event handler.
[DEPRECATED] use current_errmsg
"""
raise NotImplementedError("Derived classes must implement.")

def did_timeout(self):
"""Return whether this worker has aborted due to timeout."""
self._task_bound_check()
Expand Down Expand Up @@ -570,6 +568,8 @@ def _on_timeout(self, key):
# trigger timeout event (deprecated in 1.8+)
# also use hasattr check because ev_timeout was missing in 1.8.0
if self.eh and hasattr(self.eh, 'ev_timeout'):
warnings.warn("%s should use new ev_close() instead of " \
"ev_timeout()" % self.eh, DeprecationWarning)
self.eh.ev_timeout(self)

def abort(self):
Expand Down Expand Up @@ -680,5 +680,4 @@ def _on_written(self, key, bytes_count, sname):

if self.eh is not None:
# generate ev_written
_eh_sigspec_invoke_compat(self.eh.ev_written, 5, self, key, sname,
bytes_count)
self.eh.ev_written(self, key, sname, bytes_count)
44 changes: 21 additions & 23 deletions tests/TaskDistantMixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,30 +226,28 @@ def __init__(self, test):
def ev_start(self, worker):
self.test.assertEqual(self.flags, 0)
self.flags |= EV_START
def ev_pickup(self, worker):
def ev_pickup(self, worker, node):
self.test.assertTrue(self.flags & EV_START)
self.flags |= EV_PICKUP
self.last_node = worker.current_node
def ev_read(self, worker):
self.last_node = node
def ev_read(self, worker, node, sname, msg):
self.test.assertEqual(self.flags, EV_START | EV_PICKUP)
self.flags |= EV_READ
self.last_node = worker.current_node
self.last_read = worker.current_msg
def ev_written(self, worker):
self.last_node = node
self.last_read = msg
def ev_written(self, worker, node, sname, size):
self.test.assertTrue(self.flags & (EV_START | EV_PICKUP))
self.flags |= EV_WRITTEN
def ev_hup(self, worker):
def ev_hup(self, worker, node, rc):
self.test.assertTrue(self.flags & (EV_START | EV_PICKUP))
self.flags |= EV_HUP
self.last_node = worker.current_node
self.last_rc = worker.current_rc
def ev_timeout(self, worker):
self.test.assertTrue(self.flags & EV_START)
self.flags |= EV_TIMEOUT
self.last_node = worker.current_node
def ev_close(self, worker):
self.last_node = node
self.last_rc = rc
def ev_close(self, worker, timedout):
self.test.assertTrue(self.flags & EV_START)
self.test.assertTrue(self.flags & EV_CLOSE == 0)
if timedout:
self.flags |= EV_TIMEOUT
self.flags |= EV_CLOSE

def testShellEvents(self):
Expand Down Expand Up @@ -550,8 +548,9 @@ def testSshOptionsOptionForScp(self):

def testShellStderrWithHandler(self):
class StdErrHandler(EventHandler):
def ev_error(self, worker):
assert worker.current_errmsg == b"something wrong"
def ev_read(self, worker, node, sname, msg):
if sname == worker.SNAME_STDERR:
assert msg == b"something wrong"

worker = self._task.shell("echo something wrong 1>&2", nodes=HOSTNAME,
handler=StdErrHandler(), stderr=True)
Expand All @@ -572,17 +571,16 @@ def testShellWriteHandler(self):
class WriteOnReadHandler(EventHandler):
def __init__(self, target_worker):
self.target_worker = target_worker
def ev_read(self, worker):
self.target_worker.write(worker.current_node.encode('utf-8')
+ b':' + worker.current_msg + b'\n')
def ev_read(self, worker, node, sname, msg):
self.target_worker.write(node.encode() + b':' + msg + b'\n')
self.target_worker.set_write_eof()

reader = self._task.shell("cat", nodes=HOSTNAME)
worker = self._task.shell("sleep 1; echo foobar", nodes=HOSTNAME,
handler=WriteOnReadHandler(reader))
self._task.resume()
res = "%s:foobar" % HOSTNAME
self.assertEqual(reader.node_buffer(HOSTNAME), res.encode('utf-8'))
self.assertEqual(reader.node_buffer(HOSTNAME), res.encode())

def testSshBadArgumentOption(self):
# Check code < 1.4 compatibility
Expand Down Expand Up @@ -715,13 +713,13 @@ def __init__(self):
def ev_start(self, worker):
self.start_count += 1

def ev_pickup(self, worker):
def ev_pickup(self, worker, node):
self.pickup_count += 1

def ev_hup(self, worker):
def ev_hup(self, worker, node, rc):
self.hup_count += 1

def ev_close(self, worker):
def ev_close(self, worker, timedout):
self.close_count += 1

def testWorkerEventCount(self):
Expand Down
32 changes: 15 additions & 17 deletions tests/TaskDistantPdshMixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,30 +172,28 @@ def __init__(self, test):
def ev_start(self, worker):
self.test.assertEqual(self.flags, 0)
self.flags |= EV_START
def ev_pickup(self, worker):
def ev_pickup(self, worker, node):
self.test.assertTrue(self.flags & EV_START)
self.flags |= EV_PICKUP
self.last_node = worker.current_node
def ev_read(self, worker):
self.last_node = node
def ev_read(self, worker, node, sname, msg):
self.test.assertEqual(self.flags, EV_START | EV_PICKUP)
self.flags |= EV_READ
self.last_node = worker.current_node
self.last_read = worker.current_msg
def ev_written(self, worker):
self.last_node = node
self.last_read = msg
def ev_written(self, worker, node, sname, size):
self.test.assertTrue(self.flags & (EV_START | EV_PICKUP))
self.flags |= EV_WRITTEN
def ev_hup(self, worker):
def ev_hup(self, worker, node, rc):
self.test.assertTrue(self.flags & (EV_START | EV_PICKUP))
self.flags |= EV_HUP
self.last_node = worker.current_node
self.last_rc = worker.current_rc
def ev_timeout(self, worker):
self.test.assertTrue(self.flags & EV_START)
self.flags |= EV_TIMEOUT
self.last_node = worker.current_node
def ev_close(self, worker):
self.last_node = node
self.last_rc = rc
def ev_close(self, worker, timedout):
self.test.assertTrue(self.flags & EV_START)
self.test.assertTrue(self.flags & EV_CLOSE == 0)
if timedout:
self.flags |= EV_TIMEOUT
self.flags |= EV_CLOSE

def testExplicitWorkerPdshShellEvents(self):
Expand Down Expand Up @@ -500,13 +498,13 @@ def __init__(self):
def ev_start(self, worker):
self.start_count += 1

def ev_pickup(self, worker):
def ev_pickup(self, worker, node):
self.pickup_count += 1

def ev_hup(self, worker):
def ev_hup(self, worker, node, rc):
self.hup_count += 1

def ev_close(self, worker):
def ev_close(self, worker, timedout):
self.close_count += 1

def testWorkerEventCount(self):
Expand Down
66 changes: 34 additions & 32 deletions tests/TaskEventTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,15 @@ def test_simple_event_handler_legacy(self):
eh = LegacyTestHandler()
# init worker
worker = task.shell("echo abcdefghijklmnopqrstuvwxyz", handler=eh)
# future warnings: pickup + read + hup + close
#self.run_task_and_catch_warnings(task, 4)
self.run_task_and_catch_warnings(task, 0)
# warnings: pickup + read + hup + close
self.run_task_and_catch_warnings(task, 4)
eh.do_asserts_read_notimeout()
eh.reset_asserts()

# test again
worker = task.shell("echo abcdefghijklmnopqrstuvwxyz", handler=eh)
# future warnings: pickup + read + hup + close
#self.run_task_and_catch_warnings(task, 4)
self.run_task_and_catch_warnings(task, 0)
# warnings: pickup + read + hup + close
self.run_task_and_catch_warnings(task, 4)
eh.do_asserts_read_notimeout()

def test_simple_event_handler(self):
Expand All @@ -191,18 +189,30 @@ def test_simple_event_handler(self):
self.run_task_and_catch_warnings(task)
eh.do_asserts_read_notimeout()

def test_simple_event_handler_with_task_timeout_legacy(self):
def test_simple_event_handler_with_timeout_legacy(self):
"""test simple event handler with timeout (legacy)"""
task = task_self()

eh = LegacyTestHandler()

task.shell("/bin/sleep 3", handler=eh, timeout=2)

# warnings: pickup + timeout + close
self.run_task_and_catch_warnings(task, 3)

eh.do_asserts_timeout()

def test_simple_event_handler_with_task_timeout_legacy(self):
"""test simple event handler with task timeout (legacy)"""
task = task_self()

eh = LegacyTestHandler()

task.shell("/bin/sleep 3", handler=eh)

try:
# future warnings: pickup + close
#self.run_task_and_catch_warnings(task, 2, task_timeout=2)
self.run_task_and_catch_warnings(task, 0, task_timeout=2)
# warnings: pickup + timeout + close
self.run_task_and_catch_warnings(task, 3, task_timeout=2)
except TimeoutError:
pass
else:
Expand All @@ -211,7 +221,7 @@ def test_simple_event_handler_with_task_timeout_legacy(self):
eh.do_asserts_timeout()

def test_simple_event_handler_with_task_timeout(self):
"""test simple event handler with timeout (1.8+)"""
"""test simple event handler with task timeout (1.8+)"""
task = task_self()

eh = TestHandler()
Expand Down Expand Up @@ -271,9 +281,8 @@ def ev_close(self, worker):
worker.write(content)
worker.set_write_eof()

# future warnings: 1 x pickup + 1 x read + 1 x hup + 1 x close
#self.run_task_and_catch_warnings(task, 4)
self.run_task_and_catch_warnings(task, 0)
# warnings: 1 x pickup + 1 x read + 1 x hup + 1 x close
self.run_task_and_catch_warnings(task, 4)
eh.do_asserts_read_write_notimeout()

def test_popen_specific_behaviour(self):
Expand Down Expand Up @@ -348,10 +357,8 @@ def test_engine_on_the_fly_launch_legacy(self):
worker = task.shell("/bin/uname", handler=eh)
self.assertNotEqual(worker, None)

# future warnings: 1 x pickup + 1 x read + 2 x pickup + 3 x hup +
# 3 x close
#self.run_task_and_catch_warnings(task, 10)
self.run_task_and_catch_warnings(task, 0)
# warnings: 1 x pickup + 1 x read + 2 x pickup + 3 x hup + 3 x close
self.run_task_and_catch_warnings(task, 10)

class TOnTheFlyLauncher(EventHandler):
"""CS v1.8 Test Event handler to shedules commands on the fly"""
Expand Down Expand Up @@ -384,8 +391,7 @@ def test_write_on_ev_start_legacy(self):
"""test write on ev_start (legacy)"""
task = task_self()
task.shell("cat", handler=self.__class__.LegacyTWriteOnStart())
#self.run_task_and_catch_warnings(task, 1) # future: read
self.run_task_and_catch_warnings(task, 0)
self.run_task_and_catch_warnings(task, 1) # ev_read

class TWriteOnStart(EventHandler):
def ev_start(self, worker):
Expand Down Expand Up @@ -416,9 +422,8 @@ def test_engine_may_reuse_fd_legacy(self):
worker = task.shell("echo ok; sleep 1", handler=eh)
self.assertTrue(worker is not None)
worker.write(b"OK\n")
# future warnings: 10 x read
#self.run_task_and_catch_warnings(task, 10)
self.run_task_and_catch_warnings(task, 0)
# warnings: 10 x read
self.run_task_and_catch_warnings(task, 10)
finally:
task.set_info("fanout", fanout)

Expand Down Expand Up @@ -451,9 +456,8 @@ def test_ev_pickup_legacy(self):
task.shell("/bin/sleep 0.5", handler=eh)
task.shell("/bin/sleep 0.5", handler=eh)

# future warnings: 3 x pickup + 3 x hup + 3 x close
#self.run_task_and_catch_warnings(task, 9)
self.run_task_and_catch_warnings(task, 0)
# warnings: 3 x pickup + 3 x hup + 3 x close
self.run_task_and_catch_warnings(task, 9)

eh.do_asserts_noread_notimeout()
self.assertEqual(eh.cnt_pickup, 3)
Expand Down Expand Up @@ -488,9 +492,8 @@ def test_ev_pickup_fanout_legacy(self):
task.shell("/bin/sleep 0.5", handler=eh, key="n2")
task.shell("/bin/sleep 0.5", handler=eh, key="n3")

# future warnings: 3 x pickup + 3 x hup + 3 x close
#self.run_task_and_catch_warnings(task, 9)
self.run_task_and_catch_warnings(task, 0)
# warnings: 3 x pickup + 3 x hup + 3 x close
self.run_task_and_catch_warnings(task, 9)

eh.do_asserts_noread_notimeout()
self.assertEqual(eh.cnt_pickup, 3)
Expand Down Expand Up @@ -530,9 +533,8 @@ def test_ev_written_legacy(self):
worker.write(content)
worker.set_write_eof()

# future warnings: pickup + read + hup + close
#self.run_task_and_catch_warnings(task, 4)
self.run_task_and_catch_warnings(task, 0)
# warnings: pickup + read + hup + close
self.run_task_and_catch_warnings(task, 4)

eh.do_asserts_read_write_notimeout()
self.assertEqual(eh.cnt_written, 1)
Expand Down
Loading