@@ -46,15 +46,17 @@ class Dispatcher(metaclass=DispatcherMeta):
4646
4747 _GRPC_STOP_RESPONSE = object ()
4848
49- def __init__ (self , loop , host , port , worker_id , request_id ,
50- grpc_connect_timeout , grpc_max_msg_len = - 1 ):
49+ def __init__ (self , loop , host , port : int , worker_id : str , request_id : str ,
50+ grpc_connect_timeout : float , grpc_max_msg_len : int = - 1 ):
5151 self ._loop = loop
5252 self ._host = host
5353 self ._port = port
5454 self ._request_id = request_id
5555 self ._worker_id = worker_id
5656 self ._functions = functions .Registry ()
5757
58+ self ._old_task_factory = None
59+
5860 # A thread-pool for synchronous function calls. We limit
5961 # the number of threads to 1 so that one Python worker can
6062 # only run one synchronous function in parallel. This is
@@ -75,7 +77,8 @@ def __init__(self, loop, host, port, worker_id, request_id,
7577 self ._grpc_thread = threading .Thread (
7678 name = 'grpc-thread' , target = self .__poll_grpc )
7779
78- def load_bindings (self ):
80+ @staticmethod
81+ def load_bindings ():
7982 """Load out-of-tree binding implementations."""
8083 services = {}
8184
@@ -88,18 +91,17 @@ def load_bindings(self):
8891 @classmethod
8992 async def connect (cls , host , port , worker_id , request_id ,
9093 connect_timeout ):
91- loop = asyncio ._get_running_loop ()
92- disp = cls (loop , host , port , worker_id , request_id ,
93- connect_timeout )
94+ loop = asyncio .events .get_event_loop ()
95+ disp = cls (loop , host , port , worker_id , request_id , connect_timeout )
9496 disp ._grpc_thread .start ()
9597 await disp ._grpc_connected_fut
9698 logger .info ('Successfully opened gRPC channel to %s:%s' , host , port )
9799 return disp
98100
99101 async def dispatch_forever (self ):
100102 if DispatcherMeta .__current_dispatcher__ is not None :
101- raise RuntimeError (
102- 'there can be only one running dispatcher per process' )
103+ raise RuntimeError ('there can be only one running dispatcher per '
104+ ' process' )
103105
104106 self ._old_task_factory = self ._loop .get_task_factory ()
105107
@@ -131,7 +133,7 @@ async def dispatch_forever(self):
131133 try :
132134 await forever
133135 finally :
134- # Reenable console logging when there's an exception
136+ # Re-enable console logging when there's an exception
135137 enable_console_logging ()
136138 root_logger .removeHandler (logging_handler )
137139 finally :
@@ -152,7 +154,7 @@ def stop(self):
152154 self ._sync_call_tp .shutdown ()
153155 self ._sync_call_tp = None
154156
155- def _on_logging (self , record : logging .LogRecord , formatted_msg : str ):
157+ def on_logging (self , record : logging .LogRecord , formatted_msg : str ):
156158 if record .levelno >= logging .CRITICAL :
157159 log_level = protos .RpcLog .Critical
158160 elif record .levelno >= logging .ERROR :
@@ -201,7 +203,9 @@ def request_id(self):
201203 def worker_id (self ):
202204 return self ._worker_id
203205
204- def _serialize_exception (self , exc ):
206+ # noinspection PyBroadException
207+ @staticmethod
208+ def _serialize_exception (exc : Exception ):
205209 try :
206210 message = f'{ type (exc ).__name__ } : { exc } '
207211 except Exception :
@@ -222,8 +226,8 @@ async def _dispatch_grpc_request(self, request):
222226 # Don't crash on unknown messages. Some of them can be ignored;
223227 # and if something goes really wrong the host can always just
224228 # kill the worker's process.
225- logger .error (
226- f'unknown StreamingMessage content type { content_type } ' )
229+ logger .error (f'unknown StreamingMessage content type '
230+ f' { content_type } ' )
227231 return
228232
229233 resp = await request_handler (request )
@@ -524,7 +528,7 @@ def emit(self, record):
524528 # Since we disable console log after gRPC channel is initiated
525529 # We should redirect all the messages into dispatcher
526530 msg = self .format (record )
527- Dispatcher .current ._on_logging (record , msg )
531+ Dispatcher .current .on_logging (record , msg )
528532
529533
530534class ContextEnabledTask (asyncio .Task ):
0 commit comments