@@ -167,15 +167,8 @@ def __init__(self, **kwargs):
167167 self .control_handlers [msg_type ] = getattr (self , msg_type )
168168
169169 @gen .coroutine
170- def dispatch_control (self , msg ):
170+ def dispatch_control (self , msg , ident , stream = None ):
171171 """dispatch control requests"""
172- idents , msg = self .session .feed_identities (msg , copy = False )
173- try :
174- msg = self .session .deserialize (msg , content = True , copy = False )
175- except :
176- self .log .error ("Invalid Control Message" , exc_info = True )
177- return
178-
179172 self .log .debug ("Control received: %s" , msg )
180173
181174 # Set the parent message for side effects.
@@ -215,15 +208,8 @@ def should_handle(self, stream, msg, idents):
215208 return True
216209
217210 @gen .coroutine
218- def dispatch_shell (self , stream , msg ):
211+ def dispatch_shell (self , msg , ident , stream ):
219212 """dispatch shell requests"""
220- idents , msg = self .session .feed_identities (msg , copy = False )
221- try :
222- msg = self .session .deserialize (msg , content = True , copy = False )
223- except :
224- self .log .error ("Invalid Message" , exc_info = True )
225- return
226-
227213 # Set the parent message for side effects.
228214 self .set_parent (idents , msg )
229215 self ._publish_status ('busy' )
@@ -385,16 +371,43 @@ def dispatch_queue(self):
385371 def _message_counter_default (self ):
386372 return itertools .count ()
387373
388- def schedule_dispatch (self , priority , dispatch , * args ):
374+ def should_dispatch_immediately (
375+ self , msg , ident , stream , priority , dispatch
376+ ):
377+ """
378+ This provides a hook for dispatching incoming messages
379+ from the frontend immediately, and out of order.
380+
381+ It could be used to allow asynchronous messages from
382+ GUIs to be processed.
383+ """
384+ return False
385+
386+ def schedule_dispatch (self , msg , priority , dispatch , stream = None ):
389387 """schedule a message for dispatch"""
388+
389+ idents , msg = self .session .feed_identities (msg , copy = False )
390+ try :
391+ msg = self .session .deserialize (msg , content = True , copy = False )
392+ except :
393+ self .log .error ("Invalid Message" , exc_info = True )
394+ return
395+
396+ new_args = (msg , ident , stream )
397+
398+ if self .should_dispatch_immediately (
399+ msg , ident , stream , priority , dispatch , stream
400+ ):
401+ return self .io_loop .add_callback (dispatch , * new_args )
402+
390403 idx = next (self ._message_counter )
391404
392405 self .msg_queue .put_nowait (
393406 (
394407 priority ,
395408 idx ,
396409 dispatch ,
397- args ,
410+ new_args ,
398411 )
399412 )
400413 # ensure the eventloop wakes up
@@ -411,8 +424,8 @@ def start(self):
411424 self .control_stream .on_recv (
412425 partial (
413426 self .schedule_dispatch ,
414- CONTROL_PRIORITY ,
415- self .dispatch_control ,
427+ priority = CONTROL_PRIORITY ,
428+ dispatch = self .dispatch_control ,
416429 ),
417430 copy = False ,
418431 )
@@ -423,9 +436,9 @@ def start(self):
423436 s .on_recv (
424437 partial (
425438 self .schedule_dispatch ,
426- SHELL_PRIORITY ,
427- self .dispatch_shell ,
428- s ,
439+ priority = SHELL_PRIORITY ,
440+ dispatch = self .dispatch_shell ,
441+ stream = s ,
429442 ),
430443 copy = False ,
431444 )
0 commit comments