1
1
import asyncio
2
- import sys
3
2
import urllib .parse
4
3
5
4
import pytest
@@ -20,23 +19,12 @@ def redis_addr(request):
20
19
return netloc , "6379"
21
20
22
21
23
- async def pipe (
24
- reader : asyncio .StreamReader , writer : asyncio .StreamWriter , delay : float , name = ""
25
- ):
26
- while True :
27
- data = await reader .read (1000 )
28
- if not data :
29
- break
30
- await asyncio .sleep (delay )
31
- writer .write (data )
32
- await writer .drain ()
33
-
34
-
35
22
class DelayProxy :
36
23
def __init__ (self , addr , redis_addr , delay : float ):
37
24
self .addr = addr
38
25
self .redis_addr = redis_addr
39
26
self .delay = delay
27
+ self .send_event = asyncio .Event ()
40
28
41
29
async def start (self ):
42
30
# test that we can connect to redis
@@ -49,10 +37,10 @@ async def start(self):
49
37
async def handle (self , reader , writer ):
50
38
# establish connection to redis
51
39
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
52
- pipe1 = asyncio .create_task (pipe (reader , redis_writer , self .delay , "to redis:" ))
53
- pipe2 = asyncio .create_task (
54
- pipe (redis_reader , writer , self .delay , "from redis:" )
40
+ pipe1 = asyncio .create_task (
41
+ self .pipe (reader , redis_writer , "to redis:" , self .send_event )
55
42
)
43
+ pipe2 = asyncio .create_task (self .pipe (redis_reader , writer , "from redis:" ))
56
44
await asyncio .gather (pipe1 , pipe2 )
57
45
58
46
async def stop (self ):
@@ -61,6 +49,24 @@ async def stop(self):
61
49
loop = self .server .get_loop ()
62
50
await loop .shutdown_asyncgens ()
63
51
52
+ async def pipe (
53
+ self ,
54
+ reader : asyncio .StreamReader ,
55
+ writer : asyncio .StreamWriter ,
56
+ name = "" ,
57
+ event : asyncio .Event = None ,
58
+ ):
59
+ while True :
60
+ data = await reader .read (1000 )
61
+ if not data :
62
+ break
63
+ # print(f"{name} read {len(data)} delay {self.delay}")
64
+ if event :
65
+ event .set ()
66
+ await asyncio .sleep (self .delay )
67
+ writer .write (data )
68
+ await writer .drain ()
69
+
64
70
65
71
@pytest .mark .onlynoncluster
66
72
@pytest .mark .parametrize ("delay" , argvalues = [0.05 , 0.5 , 1 , 2 ])
@@ -78,17 +84,18 @@ async def test_standalone(delay, redis_addr):
78
84
await r .set ("foo" , "foo" )
79
85
await r .set ("bar" , "bar" )
80
86
87
+ dp .send_event .clear ()
81
88
t = asyncio .create_task (r .get ("foo" ))
82
- await asyncio .sleep (delay )
89
+ # Wait until the task has sent, and then some, to make sure it has
90
+ # settled on the read.
91
+ await dp .send_event .wait ()
92
+ await asyncio .sleep (0.01 ) # a little extra time for prudence
83
93
t .cancel ()
84
- try :
94
+ with pytest . raises ( asyncio . CancelledError ) :
85
95
await t
86
- sys .stderr .write ("try again, we did not cancel the task in time\n " )
87
- except asyncio .CancelledError :
88
- sys .stderr .write (
89
- "canceled task, connection is left open with unread response\n "
90
- )
91
96
97
+ # make sure that our previous request, cancelled while waiting for
98
+ # a repsponse, didn't leave the connection open andin a bad state
92
99
assert await r .get ("bar" ) == b"bar"
93
100
assert await r .ping ()
94
101
assert await r .get ("foo" ) == b"foo"
@@ -113,10 +120,17 @@ async def test_standalone_pipeline(delay, redis_addr):
113
120
pipe2 .ping ()
114
121
pipe2 .get ("foo" )
115
122
123
+ dp .send_event .clear ()
116
124
t = asyncio .create_task (pipe .get ("foo" ).execute ())
117
- await asyncio .sleep (delay )
125
+ # wait until task has settled on the read
126
+ await dp .send_event .wait ()
127
+ await asyncio .sleep (0.01 )
118
128
t .cancel ()
129
+ with pytest .raises (asyncio .CancelledError ):
130
+ await t
119
131
132
+ # we have now cancelled the pieline in the middle of a request, make sure
133
+ # that the connection is still usable
120
134
pipe .get ("bar" )
121
135
pipe .ping ()
122
136
pipe .get ("foo" )
@@ -147,13 +161,13 @@ async def test_cluster(request, redis_addr):
147
161
await r .set ("foo" , "foo" )
148
162
await r .set ("bar" , "bar" )
149
163
164
+ dp .send_event .clear ()
150
165
t = asyncio .create_task (r .get ("foo" ))
151
- await asyncio .sleep (0.050 )
166
+ await dp .send_event .wait ()
167
+ await asyncio .sleep (0.01 )
152
168
t .cancel ()
153
- try :
169
+ with pytest . raises ( asyncio . CancelledError ) :
154
170
await t
155
- except asyncio .CancelledError :
156
- pytest .fail ("connection is left open with unread response" )
157
171
158
172
assert await r .get ("bar" ) == b"bar"
159
173
assert await r .ping ()
0 commit comments