1
1
import asyncio
2
+ import contextlib
2
3
import urllib .parse
3
4
4
5
import pytest
@@ -20,7 +21,7 @@ def redis_addr(request):
20
21
21
22
22
23
class DelayProxy :
23
- def __init__ (self , addr , redis_addr , delay : float ):
24
+ def __init__ (self , addr , redis_addr , delay : float = 0.0 ):
24
25
self .addr = addr
25
26
self .redis_addr = redis_addr
26
27
self .delay = delay
@@ -34,6 +35,19 @@ async def start(self):
34
35
self .server = await asyncio .start_server (self .handle , * self .addr )
35
36
self .ROUTINE = asyncio .create_task (self .server .serve_forever ())
36
37
38
+ @contextlib .contextmanager
39
+ def set_delay (self , delay : float = 0.0 ):
40
+ """
41
+ Allow to override the delay for parts of tests which aren't time dependent,
42
+ to speed up execution.
43
+ """
44
+ old = self .delay
45
+ self .delay = delay
46
+ try :
47
+ yield
48
+ finally :
49
+ self .delay = old
50
+
37
51
async def handle (self , reader , writer ):
38
52
# establish connection to redis
39
53
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
@@ -74,7 +88,7 @@ async def test_standalone(delay, redis_addr):
74
88
75
89
# create a tcp socket proxy that relays data to Redis and back,
76
90
# inserting 0.1 seconds of delay
77
- dp = DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr , delay = delay * 2 )
91
+ dp = DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr )
78
92
await dp .start ()
79
93
80
94
for b in [True , False ]:
@@ -84,8 +98,14 @@ async def test_standalone(delay, redis_addr):
84
98
await r .set ("foo" , "foo" )
85
99
await r .set ("bar" , "bar" )
86
100
101
+ async def op (r ):
102
+ with dp .set_delay (delay * 2 ):
103
+ return await r .get (
104
+ "foo"
105
+ ) # <-- this is the operation we want to cancel
106
+
87
107
dp .send_event .clear ()
88
- t = asyncio .create_task (r . get ( "foo" ))
108
+ t = asyncio .create_task (op ( r ))
89
109
# Wait until the task has sent, and then some, to make sure it has
90
110
# settled on the read.
91
111
await dp .send_event .wait ()
@@ -106,7 +126,7 @@ async def test_standalone(delay, redis_addr):
106
126
@pytest .mark .onlynoncluster
107
127
@pytest .mark .parametrize ("delay" , argvalues = [0.05 , 0.5 , 1 , 2 ])
108
128
async def test_standalone_pipeline (delay , redis_addr ):
109
- dp = DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr , delay = delay * 2 )
129
+ dp = DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr )
110
130
await dp .start ()
111
131
for b in [True , False ]:
112
132
async with Redis (host = "127.0.0.1" , port = 5380 , single_connection_client = b ) as r :
@@ -120,8 +140,14 @@ async def test_standalone_pipeline(delay, redis_addr):
120
140
pipe2 .ping ()
121
141
pipe2 .get ("foo" )
122
142
143
+ async def op (pipe ):
144
+ with dp .set_delay (delay * 2 ):
145
+ return await pipe .get (
146
+ "foo"
147
+ ).execute () # <-- this is the operation we want to cancel
148
+
123
149
dp .send_event .clear ()
124
- t = asyncio .create_task (pipe . get ( "foo" ). execute ( ))
150
+ t = asyncio .create_task (op ( pipe ))
125
151
# wait until task has settled on the read
126
152
await dp .send_event .wait ()
127
153
await asyncio .sleep (0.01 )
@@ -153,16 +179,21 @@ async def test_standalone_pipeline(delay, redis_addr):
153
179
async def test_cluster (request , redis_addr ):
154
180
155
181
redis_addr = redis_addr [0 ], 6372 # use the cluster port
156
- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr , delay = 0.1 )
182
+ delay = 0.1
183
+ dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr )
157
184
await dp .start ()
158
185
159
186
r = RedisCluster .from_url ("redis://127.0.0.1:5381" )
160
187
await r .initialize ()
161
188
await r .set ("foo" , "foo" )
162
189
await r .set ("bar" , "bar" )
163
190
191
+ async def op (r ):
192
+ with dp .set_delay (delay ):
193
+ return await r .get ("foo" )
194
+
164
195
dp .send_event .clear ()
165
- t = asyncio .create_task (r . get ( "foo" ))
196
+ t = asyncio .create_task (op ( r ))
166
197
await dp .send_event .wait ()
167
198
await asyncio .sleep (0.01 )
168
199
t .cancel ()
0 commit comments