Skip to content

Commit bebf6a5

Browse files
committed
Implement Actors
1 parent 9e90c06 commit bebf6a5

File tree

10 files changed

+966
-56
lines changed

10 files changed

+966
-56
lines changed

distributed/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from . import config
44
from dask.config import config
5+
from .actor import Actor, ActorFuture
56
from .core import connect, rpc
67
from .deploy import LocalCluster, Adaptive
78
from .diagnostics import progress

distributed/actor.py

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
from tornado import gen
2+
import functools
3+
4+
from .client import Future, default_client
5+
from .compatibility import get_thread_identity, Queue
6+
from .protocol import to_serialize
7+
from .utils import sync
8+
from .utils_comm import WrappedKey
9+
from .worker import get_worker
10+
11+
12+
class Actor(WrappedKey):
13+
""" Controls an object on a remote worker
14+
15+
An actor allows remote control of a stateful object living on a remote
16+
worker. Method calls on this object trigger operations on the remote
17+
object and return ActorFutures on which we can block to get results.
18+
19+
Examples
20+
--------
21+
>>> class Counter:
22+
... def __init__(self):
23+
... self.n = 0
24+
... def increment(self):
25+
... self.n += 1
26+
... return self.n
27+
28+
>>> from dask.distributed import Client
29+
>>> client = Client()
30+
31+
You can create an actor by submitting a class with the keyword
32+
``actor=True``.
33+
34+
>>> future = client.submit(Counter, actor=True)
35+
>>> counter = future.result()
36+
>>> counter
37+
<Actor: Counter, key=Counter-1234abcd>
38+
39+
Calling methods on this object immediately returns deferred ``ActorFuture``
40+
objects. You can call ``.result()`` on these objects to block and get the
41+
result of the function call.
42+
43+
>>> future = counter.increment()
44+
>>> future.result()
45+
1
46+
>>> future = counter.increment()
47+
>>> future.result()
48+
2
49+
"""
50+
def __init__(self, cls, address, key, worker=None):
51+
self._cls = cls
52+
self._address = address
53+
self.key = key
54+
self._future = None
55+
if worker:
56+
self._worker = worker
57+
self._client = None
58+
else:
59+
try:
60+
self._worker = get_worker()
61+
except ValueError:
62+
self._worker = None
63+
try:
64+
self._client = default_client()
65+
self._future = Future(key)
66+
except ValueError:
67+
self._client = None
68+
69+
def __repr__(self):
70+
return '<Actor: %s, key=%s>' % (self._cls.__name__, self.key)
71+
72+
def __reduce__(self):
73+
return (Actor, (self._cls, self._address, self.key))
74+
75+
@property
76+
def _io_loop(self):
77+
if self._worker:
78+
return self._worker.io_loop
79+
else:
80+
return self._client.io_loop
81+
82+
@property
83+
def _scheduler_rpc(self):
84+
if self._worker:
85+
return self._worker.scheduler
86+
else:
87+
return self._client.scheduler
88+
89+
@property
90+
def _worker_rpc(self):
91+
if self._worker:
92+
return self._worker.rpc(self._address)
93+
else:
94+
if self._client.direct_to_workers:
95+
return self._client.rpc(self._address)
96+
else:
97+
return ProxyRPC(self._client.scheduler, self._address)
98+
99+
@property
100+
def _asynchronous(self):
101+
if self._client:
102+
return self._client.asynchronous
103+
else:
104+
return get_thread_identity() == self._worker.thread_id
105+
106+
def _sync(self, func, *args, **kwargs):
107+
if self._client:
108+
return self._client.sync(func, *args, **kwargs)
109+
else:
110+
# TODO support sync operation by checking against thread ident of loop
111+
return sync(self._worker.loop, func, *args, **kwargs)
112+
113+
def __dir__(self):
114+
o = set(dir(type(self)))
115+
o.update({attr for attr in dir(self._cls) if not attr.startswith('_')})
116+
return sorted(o)
117+
118+
def __getattr__(self, key):
119+
if not hasattr(self._cls, key):
120+
raise AttributeError("%s does not have attribute %s" %
121+
(type(self).__name__, key))
122+
123+
attr = getattr(self._cls, key)
124+
125+
if self._future and not self._future.status == 'finished':
126+
raise ValueError("Worker holding Actor was lost")
127+
128+
if callable(attr):
129+
@functools.wraps(attr)
130+
def func(*args, **kwargs):
131+
@gen.coroutine
132+
def run_actor_function_on_worker():
133+
try:
134+
result = yield self._worker_rpc.actor_execute(
135+
function=key,
136+
actor=self.key,
137+
args=[to_serialize(arg) for arg in args],
138+
kwargs={k: to_serialize(v) for k, v in kwargs.items()},
139+
)
140+
except OSError:
141+
if self._future:
142+
yield self._future
143+
else:
144+
raise OSError("Unable to contact Actor's worker")
145+
raise gen.Return(result['result'])
146+
147+
if self._asynchronous:
148+
return run_actor_function_on_worker()
149+
else:
150+
# TODO: this mechanism is error prone
151+
# we should endeavor to make dask's standard code work here
152+
q = Queue()
153+
154+
@gen.coroutine
155+
def wait_then_add_to_queue():
156+
x = yield run_actor_function_on_worker()
157+
q.put(x)
158+
self._io_loop.add_callback(wait_then_add_to_queue)
159+
160+
return ActorFuture(q, self._io_loop)
161+
return func
162+
163+
else:
164+
@gen.coroutine
165+
def get_actor_attribute_from_worker():
166+
x = yield self._worker_rpc.actor_attribute(attribute=key, actor=self.key)
167+
raise gen.Return(x['result'])
168+
169+
return self._sync(get_actor_attribute_from_worker)
170+
171+
172+
class ProxyRPC(object):
173+
"""
174+
An rpc-like object that uses the scheduler's rpc to connect to a worker
175+
"""
176+
def __init__(self, rpc, address):
177+
self.rpc = rpc
178+
self._address = address
179+
180+
def __getattr__(self, key):
181+
@gen.coroutine
182+
def func(**msg):
183+
msg['op'] = key
184+
result = yield self.rpc.proxy(worker=self._address, msg=msg)
185+
raise gen.Return(result)
186+
187+
return func
188+
189+
190+
class ActorFuture(object):
191+
""" Future to an actor's method call
192+
193+
Whenever you call a method on an Actor you get an ActorFuture immediately
194+
while the computation happens in the background. You can call ``.result``
195+
to block and collect the full result
196+
197+
See Also
198+
--------
199+
Actor
200+
"""
201+
def __init__(self, q, io_loop):
202+
self.q = q
203+
self.io_loop = io_loop
204+
205+
def result(self, timeout=None):
206+
try:
207+
return self._cached_result
208+
except AttributeError:
209+
self._cached_result = self.q.get(timeout=timeout)
210+
return self._cached_result
211+
212+
def __repr__(self):
213+
return '<ActorFuture>'

distributed/client.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -401,19 +401,7 @@ def lose(self):
401401
self._get_event().clear()
402402

403403
def set_error(self, exception, traceback):
404-
if isinstance(exception, bytes):
405-
try:
406-
exception = loads(exception)
407-
except TypeError:
408-
exception = Exception("Undeserializable exception", exception)
409-
if traceback:
410-
if isinstance(traceback, bytes):
411-
try:
412-
traceback = loads(traceback)
413-
except (TypeError, AttributeError):
414-
traceback = None
415-
else:
416-
traceback = None
404+
_, exception, traceback = clean_exception(exception, traceback)
417405

418406
self.status = 'error'
419407
self.exception = exception
@@ -1195,13 +1183,16 @@ def submit(self, func, *args, **kwargs):
11951183
raise TypeError("First input to submit must be a callable function")
11961184

11971185
key = kwargs.pop('key', None)
1198-
pure = kwargs.pop('pure', True)
11991186
workers = kwargs.pop('workers', None)
12001187
resources = kwargs.pop('resources', None)
12011188
retries = kwargs.pop('retries', None)
12021189
priority = kwargs.pop('priority', 0)
12031190
fifo_timeout = kwargs.pop('fifo_timeout', '100ms')
12041191
allow_other_workers = kwargs.pop('allow_other_workers', False)
1192+
actor = kwargs.pop('actor', False)
1193+
actors = kwargs.pop('actors', False)
1194+
actor = actor or actors
1195+
pure = kwargs.pop('pure', not actor)
12051196

12061197
if allow_other_workers not in (True, False, None):
12071198
raise TypeError("allow_other_workers= must be True or False")
@@ -1240,7 +1231,8 @@ def submit(self, func, *args, **kwargs):
12401231
user_priority=priority,
12411232
resources={skey: resources} if resources else None,
12421233
retries=retries,
1243-
fifo_timeout=fifo_timeout)
1234+
fifo_timeout=fifo_timeout,
1235+
actors=actor)
12441236

12451237
logger.debug("Submit %s(...), %s", funcname(func), key)
12461238

@@ -1322,13 +1314,16 @@ def map(self, func, *iterables, **kwargs):
13221314

13231315
key = kwargs.pop('key', None)
13241316
key = key or funcname(func)
1325-
pure = kwargs.pop('pure', True)
13261317
workers = kwargs.pop('workers', None)
13271318
retries = kwargs.pop('retries', None)
13281319
resources = kwargs.pop('resources', None)
13291320
user_priority = kwargs.pop('priority', 0)
13301321
allow_other_workers = kwargs.pop('allow_other_workers', False)
13311322
fifo_timeout = kwargs.pop('fifo_timeout', '100ms')
1323+
actor = kwargs.pop('actor', False)
1324+
actors = kwargs.pop('actors', False)
1325+
actor = actor or actors
1326+
pure = kwargs.pop('pure', not actor)
13321327

13331328
if allow_other_workers and workers is None:
13341329
raise ValueError("Only use allow_other_workers= if using workers=")
@@ -1386,7 +1381,8 @@ def map(self, func, *iterables, **kwargs):
13861381
resources=resources,
13871382
retries=retries,
13881383
user_priority=user_priority,
1389-
fifo_timeout=fifo_timeout)
1384+
fifo_timeout=fifo_timeout,
1385+
actors=actor)
13901386
logger.debug("map(%s, ...)", funcname(func))
13911387

13921388
return [futures[tokey(k)] for k in keys]
@@ -2075,7 +2071,7 @@ def run_coroutine(self, function, *args, **kwargs):
20752071
def _graph_to_futures(self, dsk, keys, restrictions=None,
20762072
loose_restrictions=None, priority=None,
20772073
user_priority=0, resources=None, retries=None,
2078-
fifo_timeout=0):
2074+
fifo_timeout=0, actors=None):
20792075
with self._lock:
20802076
if resources:
20812077
resources = self._expand_resources(resources,
@@ -2139,7 +2135,8 @@ def _graph_to_futures(self, dsk, keys, restrictions=None,
21392135
'resources': resources,
21402136
'submitting_task': getattr(thread_state, 'key', None),
21412137
'retries': retries,
2142-
'fifo_timeout': fifo_timeout})
2138+
'fifo_timeout': fifo_timeout,
2139+
'actors': actors})
21432140
return futures
21442141

21452142
def get(self, dsk, keys, restrictions=None, loose_restrictions=None,

distributed/core.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -851,9 +851,17 @@ def clean_exception(exception, traceback, **kwargs):
851851
error_message: create and serialize errors into message
852852
"""
853853
if isinstance(exception, bytes):
854-
exception = protocol.pickle.loads(exception)
854+
try:
855+
exception = protocol.pickle.loads(exception)
856+
except Exception:
857+
exception = Exception(exception)
858+
elif isinstance(exception, str):
859+
exception = Exception(exception)
855860
if isinstance(traceback, bytes):
856-
traceback = protocol.pickle.loads(traceback)
861+
try:
862+
traceback = protocol.pickle.loads(traceback)
863+
except (TypeError, AttributeError):
864+
traceback = None
857865
elif isinstance(traceback, string_types):
858866
traceback = None # happens if the traceback failed serializing
859867
return type(exception), exception, traceback

0 commit comments

Comments
 (0)