Skip to content

Commit c69a532

Browse files
author
Chris Rossi
committed
First pass at ndb.eventloop.EvenLoop implementation. Does not implement RPC
integration.
1 parent 1ac2b0d commit c69a532

File tree

2 files changed

+530
-10
lines changed

2 files changed

+530
-10
lines changed

ndb/src/google/cloud/ndb/eventloop.py

Lines changed: 218 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
1717
This should handle both asynchronous ``ndb`` objects and arbitrary callbacks.
1818
"""
19-
19+
import collections
20+
import time
2021

2122
__all__ = [
2223
"add_idle",
@@ -30,16 +31,228 @@
3031
]
3132

3233

33-
def add_idle(*args, **kwargs):
34-
raise NotImplementedError
34+
def _noop(*args, **kw):
35+
"""Do nothing."""
36+
37+
# TODO: Use utils.logging_debug when implemented
38+
_logging_debug = _noop
39+
40+
41+
_Event = collections.namedtuple('_Event', (
42+
'when', 'callback', 'args', 'kwargs'))
43+
44+
45+
class _Clock(object):
46+
"""A clock to determine the current time, in seconds."""
47+
48+
def now(self):
49+
"""Returns the number of seconds since epoch."""
50+
return time.time()
51+
52+
def sleep(self, seconds):
53+
"""Sleeps for the desired number of seconds."""
54+
time.sleep(seconds)
3555

3656

3757
class EventLoop:
38-
__slots__ = ()
58+
"""Constructor.
59+
60+
Args:
61+
clock: an eventloop._Clock object. Defaults to a time-based clock.
3962
40-
def __init__(self, *args, **kwargs):
63+
Fields:
64+
current: a FIFO list of (callback, args, kwds). These callbacks
65+
run immediately when the eventloop runs.
66+
idlers: a FIFO list of (callback, args, kwds). Thes callbacks
67+
run only when no other RPCs need to be fired first.
68+
For example, AutoBatcher uses idler to fire a batch RPC even before
69+
the batch is full.
70+
queue: a sorted list of (absolute time in sec, callback, args, kwds),
71+
sorted by time. These callbacks run only after the said time.
72+
rpcs: a map from rpc to (callback, args, kwds). Callback is called
73+
when the rpc finishes.
74+
"""
75+
__slots__ = ('clock', 'current', 'idlers', 'inactive', 'queue', 'rpcs')
76+
77+
def __init__(self, clock=None):
78+
self.clock = clock if clock else _Clock()
79+
self.current = collections.deque()
80+
self.idlers = collections.deque()
81+
self.inactive = 0
82+
self.queue = []
83+
self.rpcs = {}
84+
85+
def clear(self):
86+
"""Remove all pending events without running any."""
87+
while self.current or self.idlers or self.queue or self.rpcs:
88+
current = self.current
89+
idlers = self.idlers
90+
queue = self.queue
91+
rpcs = self.rpcs
92+
_logging_debug('Clearing stale EventLoop instance...')
93+
if current:
94+
_logging_debug(' current = %s', current)
95+
if idlers:
96+
_logging_debug(' idlers = %s', idlers)
97+
if queue:
98+
_logging_debug(' queue = %s', queue)
99+
if rpcs:
100+
_logging_debug(' rpcs = %s', rpcs)
101+
self.__init__(self.clock)
102+
current.clear()
103+
idlers.clear()
104+
queue[:] = []
105+
rpcs.clear()
106+
_logging_debug('Cleared')
107+
108+
def insort_event_right(self, event):
109+
"""Insert event in queue, and keep it sorted by `event.when` assuming
110+
queue is sorted.
111+
112+
For events with same `event.when`, new events are inserted to the
113+
right, to keep FIFO order.
114+
115+
Args:
116+
event: a (time in sec since unix epoch, callback, args, kwargs)
117+
tuple.
118+
"""
119+
queue = self.queue
120+
lo = 0
121+
hi = len(queue)
122+
while lo < hi:
123+
mid = (lo + hi) // 2
124+
if event.when < queue[mid].when:
125+
hi = mid
126+
else:
127+
lo = mid + 1
128+
queue.insert(lo, event)
129+
130+
def queue_call(self, delay, callback, *args, **kwargs):
131+
"""Schedule a function call at a specific time in the future."""
132+
if delay is None:
133+
self.current.append((callback, args, kwargs))
134+
return
135+
136+
# Times over a billion seconds are assumed to be absolute
137+
when = self.clock.now() + delay if delay < 1e9 else delay
138+
event = _Event(when, callback, args, kwargs)
139+
self.insort_event_right(event)
140+
141+
def queue_rpc(self, rpc, callback=None, *args, **kwds):
142+
"""Schedule an RPC with an optional callback.
143+
144+
The caller must have previously sent the call to the service.
145+
The optional callback is called with the remaining arguments.
146+
147+
NOTE: If the rpc is a MultiRpc, the callback will be called once
148+
for each sub-RPC. TODO: Is this a good idea?
149+
"""
150+
# TODO Integrate with gRPC
41151
raise NotImplementedError
42152

153+
def add_idle(self, callback, *args, **kwargs):
154+
"""Add an idle callback.
155+
156+
An idle callback can return True, False or None. These mean:
157+
158+
- None: remove the callback (don't reschedule)
159+
- False: the callback did no work; reschedule later
160+
- True: the callback did some work; reschedule soon
161+
162+
If the callback raises an exception, the traceback is logged and
163+
the callback is removed.
164+
"""
165+
self.idlers.append((callback, args, kwargs))
166+
167+
def run_idle(self):
168+
"""Run one of the idle callbacks.
169+
170+
Returns:
171+
True if one was called, False if no idle callback was called.
172+
"""
173+
if not self.idlers or self.inactive >= len(self.idlers):
174+
return False
175+
idler = self.idlers.popleft()
176+
callback, args, kwargs = idler
177+
_logging_debug('idler: %s', callback.__name__)
178+
result = callback(*args, **kwargs)
179+
180+
# See add_idle() for meaning of callback return value.
181+
if result is None:
182+
_logging_debug('idler %s removed', callback.__name__)
183+
else:
184+
if result:
185+
self.inactive = 0
186+
else:
187+
self.inactive += 1
188+
self.idlers.append(idler)
189+
return True
190+
191+
def _run_current(self):
192+
"""Run one current item.
193+
194+
Returns:
195+
True if one was called, False if no callback was called.
196+
"""
197+
if not self.current:
198+
return False
199+
200+
self.inactive = 0
201+
callback, args, kwargs = self.current.popleft()
202+
_logging_debug('nowevent: %s', callback.__name__)
203+
callback(*args, **kwargs)
204+
return True
205+
206+
def run0(self):
207+
"""Run one item (a callback or an RPC wait_any).
208+
209+
Returns:
210+
A time to sleep if something happened (may be 0);
211+
None if all queues are empty.
212+
"""
213+
if self._run_current() or self.run_idle():
214+
return 0
215+
216+
delay = None
217+
if self.queue:
218+
delay = self.queue[0][0] - self.clock.now()
219+
if delay <= 0:
220+
self.inactive = 0
221+
_, callback, args, kwargs = self.queue.pop(0)
222+
_logging_debug('event: %s', callback.__name__)
223+
callback(*args, **kwargs)
224+
# TODO: What if it raises an exception?
225+
return 0
226+
227+
if self.rpcs:
228+
raise NotImplementedError
229+
230+
return delay
231+
232+
def run1(self):
233+
"""Run one item (a callback or an RPC wait_any) or sleep.
234+
235+
Returns:
236+
True if something happened; False if all queues are empty.
237+
"""
238+
delay = self.run0()
239+
if delay is None:
240+
return False
241+
if delay > 0:
242+
self.clock.sleep(delay)
243+
return True
244+
245+
def run(self):
246+
"""Run until there's nothing left to do."""
247+
self.inactive = 0
248+
while True:
249+
if not self.run1():
250+
break
251+
252+
253+
def add_idle(*args, **kwargs):
254+
raise NotImplementedError
255+
43256

44257
def get_event_loop(*args, **kwargs):
45258
raise NotImplementedError

0 commit comments

Comments
 (0)