@@ -45,7 +45,7 @@ def __init__(self, addr, redis_addr, delay: float):
45
45
46
46
async def start (self ):
47
47
# test that we can connect to redis
48
- with async_timeout (2 ):
48
+ async with async_timeout (2 ):
49
49
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
50
50
redis_writer .close ()
51
51
self .server = await asyncio .start_server (self .handle , * self .addr )
@@ -67,15 +67,23 @@ def override(self, delay: float = 0.0):
67
67
async def handle (self , reader , writer ):
68
68
# establish connection to redis
69
69
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
70
- pipe1 = asyncio .create_task (
71
- pipe (reader , redis_writer , self , "to redis:" , self .send_event )
72
- )
73
- pipe2 = asyncio .create_task (pipe (redis_reader , writer , self , "from redis:" ))
74
- await asyncio .gather (pipe1 , pipe2 )
70
+ try :
71
+ pipe1 = asyncio .create_task (
72
+ pipe (reader , redis_writer , self , "to redis:" , self .send_event )
73
+ )
74
+ pipe2 = asyncio .create_task (pipe (redis_reader , writer , self , "from redis:" ))
75
+ await asyncio .gather (pipe1 , pipe2 )
76
+ finally :
77
+ redis_writer .close ()
78
+ redis_reader .close ()
75
79
76
80
async def stop (self ):
77
81
# clean up enough so that we can reuse the looper
78
82
self .ROUTINE .cancel ()
83
+ try :
84
+ await self .ROUTINE
85
+ except asyncio .CancelledError :
86
+ pass
79
87
loop = self .server .get_loop ()
80
88
await loop .shutdown_asyncgens ()
81
89
@@ -181,7 +189,7 @@ async def test_cluster(request, redis_addr):
181
189
remap .append ({"from_port" : port , "to_port" : remapped })
182
190
forward_addr = redis_addr [0 ], port
183
191
proxy = DelayProxy (
184
- addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr , delay = delay
192
+ addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr , delay = 0
185
193
)
186
194
proxies .append (proxy )
187
195
@@ -198,30 +206,29 @@ async def wait_for_send():
198
206
)
199
207
200
208
@contextlib .contextmanager
201
- def override ():
209
+ def override (delay : int = 0 ):
202
210
with contextlib .ExitStack () as stack :
203
211
for p in proxies :
204
- stack .enter_context (p .override ())
212
+ stack .enter_context (p .override (delay = delay ))
205
213
yield
206
214
207
- with override ():
208
- r = RedisCluster .from_url (
209
- f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap
210
- )
215
+ with contextlib .closing (
216
+ RedisCluster .from_url (f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap )
217
+ ) as r :
211
218
await r .initialize ()
212
219
await r .set ("foo" , "foo" )
213
220
await r .set ("bar" , "bar" )
214
221
215
- all_clear ()
216
- t = asyncio .create_task (r .get ("foo" ))
217
- # cannot wait on the send event, we don't know which node will be used
218
- await wait_for_send ()
219
- await asyncio .sleep (delay )
220
- t .cancel ()
221
- with pytest .raises (asyncio .CancelledError ):
222
- await t
222
+ all_clear ()
223
+ with override (delay = delay ):
224
+ t = asyncio .create_task (r .get ("foo" ))
225
+ # cannot wait on the send event, we don't know which node will be used
226
+ await wait_for_send ()
227
+ await asyncio .sleep (delay )
228
+ t .cancel ()
229
+ with pytest .raises (asyncio .CancelledError ):
230
+ await t
223
231
224
- with override ():
225
232
# try a number of requests to excercise all the connections
226
233
async def doit ():
227
234
assert await r .get ("bar" ) == b"bar"
0 commit comments