1
1
import asyncio
2
2
import random
3
3
import weakref
4
- import uuid
5
- from typing import AsyncIterator , Iterable , Mapping , Optional , Sequence , Tuple , Type , Any
4
+ from typing import (
5
+ Any ,
6
+ AsyncIterator ,
7
+ Iterable ,
8
+ Mapping ,
9
+ Optional ,
10
+ Sequence ,
11
+ Tuple ,
12
+ Type ,
13
+ )
6
14
7
15
from redis .asyncio .client import Redis
8
16
from redis .asyncio .connection import (
@@ -66,13 +74,13 @@ async def connect(self):
66
74
self ._connect_retry ,
67
75
lambda error : asyncio .sleep (0 ),
68
76
)
69
-
77
+
70
78
async def _connect_to_address_retry (self , host : str , port : int ) -> None :
71
79
if self ._reader :
72
80
return # already connected
73
81
try :
74
82
return await self .connect_to ((host , port ))
75
- except ConnectionError as exc :
83
+ except ConnectionError :
76
84
raise SlaveNotFoundError
77
85
78
86
async def connect_to_address (self , host : str , port : int ) -> None :
@@ -170,11 +178,6 @@ async def get_master_address(self):
170
178
171
179
async def rotate_slaves (self ) -> AsyncIterator :
172
180
"""Round-robin slave balancer"""
173
- (
174
- server_host ,
175
- server_port ,
176
- ) = self ._request_id_to_replica_address .get (iter_req_id , (None , None ))
177
-
178
181
slaves = await self .sentinel_manager .discover_slaves (self .service_name )
179
182
if slaves :
180
183
if self .slave_rr_counter is None :
@@ -201,15 +204,15 @@ async def get_connection(
201
204
to be issued to the same Redis replica.
202
205
203
206
The way each server positions each key is different with one another,
204
- and the cursor acts as the 'offset' of the scan.
205
- Hence, all scans coming from a single xxx_scan_iter_channel command
207
+ and the cursor acts as the 'offset' of the scan.
208
+ Hence, all scans coming from a single xxx_scan_iter_channel command
206
209
should go to the same replica.
207
210
"""
208
211
# If not an iter command or in master mode, call super()
209
212
# No custom logic for master, because there's only 1 master.
210
213
# The bug is only when Redis has the possibility to connect to multiple replicas
211
214
if not (iter_req_id := options .get ("_iter_req_id" , None )) or self .is_master :
212
- return await super ().get_connection (command_name , * keys , ** options ) # type: ignore[no-any-return]
215
+ return await super ().get_connection (command_name , * keys , ** options )
213
216
214
217
# Check if this iter request has already been directed to a particular server
215
218
# Check if this iter request has already been directed to a particular server
@@ -222,7 +225,7 @@ async def get_connection(
222
225
# get a connection from the pool
223
226
if server_host is None or server_port is None :
224
227
try :
225
- connection = self ._available_connections .pop () # type: ignore [assignment]
228
+ connection = self ._available_connections .pop ()
226
229
except IndexError :
227
230
connection = self .make_connection ()
228
231
# If this is not the first scan request of the iter command
@@ -236,7 +239,7 @@ async def get_connection(
236
239
and available_connection .port == server_port
237
240
):
238
241
self ._available_connections .remove (available_connection )
239
- connection = available_connection # type: ignore[assignment]
242
+ connection = available_connection
240
243
# If not, make a new dummy connection object, and set its host and port
241
244
# to the one that we want later in the call to ``connect_to_address``
242
245
if not connection :
@@ -255,22 +258,18 @@ async def get_connection(
255
258
# connect to the particular address and port
256
259
else :
257
260
# This will connect to the host and port that we've specified above
258
- await connection .connect_to_address (server_host , server_port ) # type: ignore[arg-type]
261
+ await connection .connect_to_address (server_host , server_port )
259
262
# connections that the pool provides should be ready to send
260
263
# a command. if not, the connection was either returned to the
261
264
# pool before all data has been read or the socket has been
262
265
# closed. either way, reconnect and verify everything is good.
263
266
try :
264
- # type ignore below:
265
- # attr Not defined in redis stubs and
266
- # we don't need to create a subclass to help with this single attr
267
- if await connection .can_read_destructive (): # type: ignore[attr-defined]
267
+ if await connection .can_read_destructive ():
268
268
raise ConnectionError ("Connection has data" ) from None
269
269
except (ConnectionError , OSError ):
270
270
await connection .disconnect ()
271
271
await connection .connect ()
272
- # type ignore below: similar to above
273
- if await connection .can_read_destructive (): # type: ignore[attr-defined]
272
+ if await connection .can_read_destructive ():
274
273
raise ConnectionError ("Connection not ready" ) from None
275
274
except BaseException :
276
275
# release the connection back to the pool so that we don't
@@ -286,7 +285,6 @@ async def get_connection(
286
285
return connection
287
286
288
287
289
-
290
288
class Sentinel (AsyncSentinelCommands ):
291
289
"""
292
290
Redis Sentinel cluster client
0 commit comments