@@ -273,3 +273,48 @@ async def open_connection(*args, **kwargs):
273273
274274 vals = await asyncio .gather (do_read (), do_close ())
275275 assert vals == [b"Hello, World!" , None ]
276+
277+
278+ @pytest .mark .onlynoncluster
279+ async def test_client_handle_canceled_error (create_redis ):
280+ """
281+ This test reproduces the case in issue #2539
282+ where asyncio.CancelledError is raised when the parser is reading to feed the
283+ internal buffer. The stream `readline()` will be interrupted by the CancelledError,
284+ which will result in not reading the response after executing the command. This will
285+ cause responses to be mixed up between commands. In this test, we execute a command
286+ after the CancelledError is raised, and verify that the response is correct.
287+ """
288+ r = await create_redis (single_connection_client = True )
289+
290+ async def do_pings ():
291+ while True :
292+ await r .ping ()
293+
294+ future = asyncio .ensure_future (do_pings ())
295+ await asyncio .sleep (0.01 )
296+ future .cancel ()
297+ with pytest .raises (asyncio .CancelledError ):
298+ await future
299+ # To reproduce the issue, we need to execute a command which returns a different
300+ # response type than PING. In this case, we use EXISTS because it should return an
301+ # integer.
302+ assert await r .exists ("foo" ) == 0
303+
304+ await r .sadd ("set" , "one" )
305+ await r .sadd ("set" , "two" )
306+ await r .sadd ("set" , "three" )
307+
308+ async def do_smembers ():
309+ while True :
310+ await r .smembers ("set" )
311+
312+ future = asyncio .ensure_future (do_smembers ())
313+ await asyncio .sleep (0.01 )
314+ future .cancel ()
315+ with pytest .raises (asyncio .CancelledError ):
316+ await future
317+
318+ assert await r .exists ("foo" ) == 0
319+
320+ await r .connection .disconnect ()
0 commit comments