File tree Expand file tree Collapse file tree 2 files changed +34
-1
lines changed Expand file tree Collapse file tree 2 files changed +34
-1
lines changed Original file line number Diff line number Diff line change @@ -561,6 +561,10 @@ def _recv(self):
561561 def recv (self ):
562562 return self ._recoverable (self ._recv )
563563
564+ def close (self ):
565+ self ._finalize (None )
566+ super (ResumableBidiRpc , self ).close ()
567+
564568 @property
565569 def is_active (self ):
566570 """bool: True if this stream is currently open and active."""
@@ -698,7 +702,11 @@ def stop(self):
698702 if self ._thread is not None :
699703 # Resume the thread to wake it up in case it is sleeping.
700704 self .resume ()
701- self ._thread .join ()
705+ # The daemonized thread may itself block, so don't wait
706+ # for it longer than a second.
707+ self ._thread .join (1.0 )
708+ if self ._thread .is_alive (): # pragma: NO COVER
709+ _LOGGER .warning ("Background thread did not exit." )
702710
703711 self ._thread = None
704712
Original file line number Diff line number Diff line change @@ -597,6 +597,31 @@ def test_recv_failure(self):
597597 assert bidi_rpc .is_active is False
598598 assert call .cancelled is True
599599
600+ def test_close (self ):
601+ call = mock .create_autospec (_CallAndFuture , instance = True )
602+
603+ def cancel_side_effect ():
604+ call .is_active .return_value = False
605+
606+ call .cancel .side_effect = cancel_side_effect
607+ start_rpc = mock .create_autospec (
608+ grpc .StreamStreamMultiCallable , instance = True , return_value = call
609+ )
610+ should_recover = mock .Mock (spec = ["__call__" ], return_value = False )
611+ bidi_rpc = bidi .ResumableBidiRpc (start_rpc , should_recover )
612+ bidi_rpc .open ()
613+
614+ bidi_rpc .close ()
615+
616+ should_recover .assert_not_called ()
617+ call .cancel .assert_called_once ()
618+ assert bidi_rpc .call == call
619+ assert bidi_rpc .is_active is False
620+ # ensure the request queue was signaled to stop.
621+ assert bidi_rpc .pending_requests == 1
622+ assert bidi_rpc ._request_queue .get () is None
623+ assert bidi_rpc ._finalized
624+
600625 def test_reopen_failure_on_rpc_restart (self ):
601626 error1 = ValueError ("1" )
602627 error2 = ValueError ("2" )
You can’t perform that action at this time.
0 commit comments