@@ -379,14 +379,9 @@ async def on_connect(self, connection: Connection) -> None:
379
379
if str_if_bytes (await connection .read_response_without_lock ()) != "OK" :
380
380
raise ConnectionError ("READONLY command failed" )
381
381
382
- def get_node (
383
- self ,
384
- host : Optional [str ] = None ,
385
- port : Optional [int ] = None ,
386
- node_name : Optional [str ] = None ,
387
- ) -> Optional ["ClusterNode" ]:
388
- """Get node by (host, port) or node_name."""
389
- return self .nodes_manager .get_node (host , port , node_name )
382
+ def get_nodes (self ) -> List ["ClusterNode" ]:
383
+ """Get all nodes of the cluster."""
384
+ return list (self .nodes_manager .nodes_cache .values ())
390
385
391
386
def get_primaries (self ) -> List ["ClusterNode" ]:
392
387
"""Get the primary nodes of the cluster."""
@@ -400,9 +395,29 @@ def get_random_node(self) -> "ClusterNode":
400
395
"""Get a random node of the cluster."""
401
396
return random .choice (list (self .nodes_manager .nodes_cache .values ()))
402
397
403
- def get_nodes (self ) -> List ["ClusterNode" ]:
404
- """Get all nodes of the cluster."""
405
- return list (self .nodes_manager .nodes_cache .values ())
398
+ def get_default_node (self ) -> "ClusterNode" :
399
+ """Get the default node of the client."""
400
+ return self .nodes_manager .default_node
401
+
402
+ def set_default_node (self , node : "ClusterNode" ) -> None :
403
+ """
404
+ Set the default node of the client.
405
+
406
+ :raises DataError: if None is passed or node does not exist in cluster.
407
+ """
408
+ if not node or not self .get_node (node_name = node .name ):
409
+ raise DataError ("The requested node does not exist in the cluster." )
410
+
411
+ self .nodes_manager .default_node = node
412
+
413
+ def get_node (
414
+ self ,
415
+ host : Optional [str ] = None ,
416
+ port : Optional [int ] = None ,
417
+ node_name : Optional [str ] = None ,
418
+ ) -> Optional ["ClusterNode" ]:
419
+ """Get node by (host, port) or node_name."""
420
+ return self .nodes_manager .get_node (host , port , node_name )
406
421
407
422
def get_node_from_key (
408
423
self , key : str , replica : bool = False
@@ -413,6 +428,7 @@ def get_node_from_key(
413
428
:param key:
414
429
:param replica:
415
430
| Indicates if a replica should be returned
431
+ |
416
432
None will returned if no replica holds this key
417
433
418
434
:raises SlotNotCoveredError: if the key is not covered by any slot.
@@ -431,24 +447,13 @@ def get_node_from_key(
431
447
432
448
return slot_cache [node_idx ]
433
449
434
- def get_default_node (self ) -> "ClusterNode" :
435
- """Get the default node of the client."""
436
- return self .nodes_manager .default_node
437
-
438
- def set_default_node (self , node : "ClusterNode" ) -> None :
450
+ def keyslot (self , key : EncodableT ) -> int :
439
451
"""
440
- Set the default node of the client .
452
+ Find the keyslot for a given key .
441
453
442
- :raises DataError: if None is passed or node does not exist in cluster.
454
+ See: https://redis.io/docs/manual/scaling/#redis-cluster-data-sharding
443
455
"""
444
- if not node or not self .get_node (node_name = node .name ):
445
- raise DataError ("The requested node does not exist in the cluster." )
446
-
447
- self .nodes_manager .default_node = node
448
-
449
- def set_response_callback (self , command : str , callback : ResponseCallbackT ) -> None :
450
- """Set a custom response callback."""
451
- self .response_callbacks [command ] = callback
456
+ return key_slot (self .encoder .encode (key ))
452
457
453
458
def get_encoder (self ) -> Encoder :
454
459
"""Get the encoder object of the client."""
@@ -458,14 +463,9 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
458
463
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
459
464
return self .connection_kwargs
460
465
461
- def keyslot (self , key : EncodableT ) -> int :
462
- """
463
- Find the keyslot for a given key.
464
-
465
- See: https://redis.io/docs/manual/scaling/#redis-cluster-data-sharding
466
- """
467
- k = self .encoder .encode (key )
468
- return key_slot (k )
466
+ def set_response_callback (self , command : str , callback : ResponseCallbackT ) -> None :
467
+ """Set a custom response callback."""
468
+ self .response_callbacks [command ] = callback
469
469
470
470
async def _determine_nodes (
471
471
self , command : str , * args : Any , node_flag : Optional [str ] = None
@@ -776,7 +776,6 @@ def __init__(
776
776
server_type : Optional [str ] = None ,
777
777
max_connections : int = 2 ** 31 ,
778
778
connection_class : Type [Connection ] = Connection ,
779
- response_callbacks : Dict [str , Any ] = RedisCluster .RESPONSE_CALLBACKS ,
780
779
** connection_kwargs : Any ,
781
780
) -> None :
782
781
if host == "localhost" :
@@ -792,7 +791,9 @@ def __init__(
792
791
self .max_connections = max_connections
793
792
self .connection_class = connection_class
794
793
self .connection_kwargs = connection_kwargs
795
- self .response_callbacks = response_callbacks
794
+ self .response_callbacks = connection_kwargs .pop (
795
+ "response_callbacks" , RedisCluster .RESPONSE_CALLBACKS
796
+ )
796
797
797
798
self ._connections : List [Connection ] = []
798
799
self ._free : Deque [Connection ] = collections .deque (maxlen = self .max_connections )
0 commit comments