@@ -185,30 +185,68 @@ async def op(pipe):
185185@pytest .mark .onlycluster
186186async def test_cluster (request , redis_addr ):
187187
188- redis_addr = redis_addr [0 ], 6372 # use the cluster port
189188 delay = 0.1
190- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr )
191- await dp .start ()
189+ cluster_port = 6372
190+ remap_base = 7372
191+ n_nodes = 6
192+
193+ def remap (address ):
194+ host , port = address
195+ return host , remap_base + port - cluster_port
196+
197+ proxies = []
198+ for i in range (n_nodes ):
199+ port = cluster_port + i
200+ remapped = remap_base + i
201+ forward_addr = redis_addr [0 ], port
202+ proxy = DelayProxy (addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr )
203+ proxies .append (proxy )
204+
205+ # start proxies
206+ await asyncio .gather (* [p .start () for p in proxies ])
207+
208+ def all_clear ():
209+ for p in proxies :
210+ p .send_event .clear ()
211+
212+ async def wait_for_send ():
213+ asyncio .wait (
214+ [p .send_event .wait () for p in proxies ], return_when = asyncio .FIRST_COMPLETED
215+ )
192216
193- with contextlib .closing (RedisCluster .from_url ("redis://127.0.0.1:5381" )) as r :
217+ @contextlib .contextmanager
218+ def set_delay (delay : float ):
219+ with contextlib .ExitStack () as stack :
220+ for p in proxies :
221+ stack .enter_context (p .set_delay (delay ))
222+ yield
223+
224+ with contextlib .closing (
225+ RedisCluster .from_url (f"redis://127.0.0.1:{ remap_base } " , address_remap = remap )
226+ ) as r :
194227 await r .initialize ()
195228 await r .set ("foo" , "foo" )
196229 await r .set ("bar" , "bar" )
197230
198231 async def op (r ):
199- with dp . set_delay (delay ):
232+ with set_delay (delay ):
200233 return await r .get ("foo" )
201234
202- dp . send_event . clear ()
235+ all_clear ()
203236 t = asyncio .create_task (op (r ))
204- await dp .send_event .wait ()
237+ # Wait for whichever DelayProxy gets the request first
238+ await wait_for_send ()
205239 await asyncio .sleep (0.01 )
206240 t .cancel ()
207241 with pytest .raises (asyncio .CancelledError ):
208242 await t
209243
210- assert await r .get ("bar" ) == b"bar"
211- assert await r .ping ()
212- assert await r .get ("foo" ) == b"foo"
244+ # try a number of requests to excercise all the connections
245+ async def doit ():
246+ assert await r .get ("bar" ) == b"bar"
247+ assert await r .ping ()
248+ assert await r .get ("foo" ) == b"foo"
213249
214- await dp .stop ()
250+ await asyncio .gather (* [doit () for _ in range (10 )])
251+
252+ await asyncio .gather (* (p .stop () for p in proxies ))
0 commit comments