@@ -185,30 +185,68 @@ async def op(pipe):
185
185
@pytest .mark .onlycluster
186
186
async def test_cluster (request , redis_addr ):
187
187
188
- redis_addr = redis_addr [0 ], 6372 # use the cluster port
189
188
delay = 0.1
190
- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr )
191
- await dp .start ()
189
+ cluster_port = 6372
190
+ remap_base = 7372
191
+ n_nodes = 6
192
+
193
+ def remap (address ):
194
+ host , port = address
195
+ return host , remap_base + port - cluster_port
196
+
197
+ proxies = []
198
+ for i in range (n_nodes ):
199
+ port = cluster_port + i
200
+ remapped = remap_base + i
201
+ forward_addr = redis_addr [0 ], port
202
+ proxy = DelayProxy (addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr )
203
+ proxies .append (proxy )
204
+
205
+ # start proxies
206
+ await asyncio .gather (* [p .start () for p in proxies ])
207
+
208
+ def all_clear ():
209
+ for p in proxies :
210
+ p .send_event .clear ()
211
+
212
+ async def wait_for_send ():
213
+ asyncio .wait (
214
+ [p .send_event .wait () for p in proxies ], return_when = asyncio .FIRST_COMPLETED
215
+ )
192
216
193
- with contextlib .closing (RedisCluster .from_url ("redis://127.0.0.1:5381" )) as r :
217
+ @contextlib .contextmanager
218
+ def set_delay (delay : float ):
219
+ with contextlib .ExitStack () as stack :
220
+ for p in proxies :
221
+ stack .enter_context (p .set_delay (delay ))
222
+ yield
223
+
224
+ with contextlib .closing (
225
+ RedisCluster .from_url (f"redis://127.0.0.1:{ remap_base } " , address_remap = remap )
226
+ ) as r :
194
227
await r .initialize ()
195
228
await r .set ("foo" , "foo" )
196
229
await r .set ("bar" , "bar" )
197
230
198
231
async def op (r ):
199
- with dp . set_delay (delay ):
232
+ with set_delay (delay ):
200
233
return await r .get ("foo" )
201
234
202
- dp . send_event . clear ()
235
+ all_clear ()
203
236
t = asyncio .create_task (op (r ))
204
- await dp .send_event .wait ()
237
+ # Wait for whichever DelayProxy gets the request first
238
+ await wait_for_send ()
205
239
await asyncio .sleep (0.01 )
206
240
t .cancel ()
207
241
with pytest .raises (asyncio .CancelledError ):
208
242
await t
209
243
210
- assert await r .get ("bar" ) == b"bar"
211
- assert await r .ping ()
212
- assert await r .get ("foo" ) == b"foo"
244
+ # try a number of requests to excercise all the connections
245
+ async def doit ():
246
+ assert await r .get ("bar" ) == b"bar"
247
+ assert await r .ping ()
248
+ assert await r .get ("foo" ) == b"foo"
213
249
214
- await dp .stop ()
250
+ await asyncio .gather (* [doit () for _ in range (10 )])
251
+
252
+ await asyncio .gather (* (p .stop () for p in proxies ))
0 commit comments