@@ -198,23 +198,21 @@ async def get_connection(
198
198
) -> SentinelManagedConnection :
199
199
"""
200
200
Get a connection from the pool.
201
- `xxx_scan_iter` commands needs to be handled specially.
201
+ 'xxxscan_iter' ('scan_iter', 'hscan_iter', 'sscan_iter', 'zscan_iter')
202
+ commands needs to be handled specially.
202
203
If the client is created using a connection pool, in replica mode,
203
- all ` scan` command-equivalent of the ` xxx_scan_iter` commands needs
204
+ all ' scan' command-equivalent of the ' xxx_scan_iter' commands needs
204
205
to be issued to the same Redis replica.
205
206
206
207
The way each server positions each key is different with one another,
207
- and the cursor acts as the ' offset' of the scan.
208
- Hence, all scans coming from a single xxx_scan_iter_channel command
208
+ and the cursor acts as the offset of the scan.
209
+ Hence, all scans coming from a single ' xxx_scan_iter_channel' command
209
210
should go to the same replica.
210
211
"""
211
- # If not an iter command or in master mode, call super()
212
- # No custom logic for master, because there's only 1 master.
213
- # The bug is only when Redis has the possibility to connect to multiple replicas
212
+ # If not an iter command or in master mode, call superclass' implementation
214
213
if not (iter_req_id := options .get ("_iter_req_id" , None )) or self .is_master :
215
214
return await super ().get_connection (command_name , * keys , ** options )
216
215
217
- # Check if this iter request has already been directed to a particular server
218
216
# Check if this iter request has already been directed to a particular server
219
217
(
220
218
server_host ,
@@ -232,8 +230,8 @@ async def get_connection(
232
230
else :
233
231
# Check from the available connections, if any of the connection
234
232
# is connected to the host and port that we want
235
- # If yes, use that connection
236
233
for available_connection in self ._available_connections .copy ():
234
+ # if yes, use that connection
237
235
if (
238
236
available_connection .host == server_host
239
237
and available_connection .port == server_port
@@ -247,22 +245,20 @@ async def get_connection(
247
245
assert connection
248
246
self ._in_use_connections .add (connection )
249
247
try :
250
- # ensure this connection is connected to Redis
251
- # If this is the first scan request,
252
- # just call the SentinelManagedConnection.connect()
253
- # This will call rotate_slaves
254
- # and connect to a random replica
248
+ # Ensure this connection is connected to Redis
249
+ # If this is the first scan request, it will
250
+ # call rotate_slaves and connect to a random replica
255
251
if server_port is None or server_port is None :
256
252
await connection .connect ()
257
253
# If this is not the first scan request,
258
- # connect to the particular address and port
254
+ # connect to the previous replica.
255
+ # This will connect to the host and port of the replica
259
256
else :
260
- # This will connect to the host and port that we've specified above
261
257
await connection .connect_to_address (server_host , server_port )
262
- # connections that the pool provides should be ready to send
263
- # a command. if not, the connection was either returned to the
258
+ # Connections that the pool provides should be ready to send
259
+ # a command. If not, the connection was either returned to the
264
260
# pool before all data has been read or the socket has been
265
- # closed. either way, reconnect and verify everything is good.
261
+ # closed. Either way, reconnect and verify everything is good.
266
262
try :
267
263
if await connection .can_read_destructive ():
268
264
raise ConnectionError ("Connection has data" ) from None
@@ -272,7 +268,7 @@ async def get_connection(
272
268
if await connection .can_read_destructive ():
273
269
raise ConnectionError ("Connection not ready" ) from None
274
270
except BaseException :
275
- # release the connection back to the pool so that we don't
271
+ # Release the connection back to the pool so that we don't
276
272
# leak it
277
273
await self .release (connection )
278
274
raise
@@ -281,7 +277,6 @@ async def get_connection(
281
277
connection .host ,
282
278
connection .port ,
283
279
)
284
-
285
280
return connection
286
281
287
282
0 commit comments