Skip to content

Commit eb91d4f

Browse files
committed
Adding load balancing strategy configuration to cluster clients(replacement for 'read_from_replicas' config) (#3563)
* Adding laod balancing strategy configuration to cluster clients(replacement for 'read_from_replicas' config) * Fixing linter errors * Changing the LoadBalancingStrategy type hints to be defined as optional. Fixed wording in pydocs * Adding integration tests with the different load balancing strategies for read operation * Fixing linters
1 parent c5e4324 commit eb91d4f

File tree

5 files changed

+444
-45
lines changed

5 files changed

+444
-45
lines changed

redis/asyncio/cluster.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
SLOT_ID,
4040
AbstractRedisCluster,
4141
LoadBalancer,
42+
LoadBalancingStrategy,
4243
block_pipeline_command,
4344
get_node_name,
4445
parse_cluster_slots,
@@ -67,6 +68,7 @@
6768
)
6869
from redis.typing import AnyKeyT, EncodableT, KeyT
6970
from redis.utils import (
71+
deprecated_args,
7072
deprecated_function,
7173
dict_merge,
7274
get_lib_version,
@@ -133,9 +135,15 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
133135
| See:
134136
https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
135137
:param read_from_replicas:
136-
| Enable read from replicas in READONLY mode. You can read possibly stale data.
138+
| @deprecated - please use load_balancing_strategy instead
139+
| Enable read from replicas in READONLY mode.
137140
When set to true, read commands will be assigned between the primary and
138141
its replications in a Round-Robin manner.
142+
The data read from replicas is eventually consistent with the data in primary nodes.
143+
:param load_balancing_strategy:
144+
| Enable read from replicas in READONLY mode and defines the load balancing
145+
strategy that will be used for cluster node selection.
146+
The data read from replicas is eventually consistent with the data in primary nodes.
139147
:param reinitialize_steps:
140148
| Specifies the number of MOVED errors that need to occur before reinitializing
141149
the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -228,6 +236,11 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
228236
"result_callbacks",
229237
)
230238

239+
@deprecated_args(
240+
args_to_warn=["read_from_replicas"],
241+
reason="Please configure the 'load_balancing_strategy' instead",
242+
version="5.0.3",
243+
)
231244
def __init__(
232245
self,
233246
host: Optional[str] = None,
@@ -236,6 +249,7 @@ def __init__(
236249
startup_nodes: Optional[List["ClusterNode"]] = None,
237250
require_full_coverage: bool = True,
238251
read_from_replicas: bool = False,
252+
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
239253
reinitialize_steps: int = 5,
240254
cluster_error_retry_attempts: int = 3,
241255
connection_error_retry_attempts: int = 3,
@@ -335,7 +349,7 @@ def __init__(
335349
}
336350
)
337351

338-
if read_from_replicas:
352+
if read_from_replicas or load_balancing_strategy:
339353
# Call our on_connect function to configure READONLY mode
340354
kwargs["redis_connect_func"] = self.on_connect
341355

@@ -384,6 +398,7 @@ def __init__(
384398
)
385399
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
386400
self.read_from_replicas = read_from_replicas
401+
self.load_balancing_strategy = load_balancing_strategy
387402
self.reinitialize_steps = reinitialize_steps
388403
self.cluster_error_retry_attempts = cluster_error_retry_attempts
389404
self.connection_error_retry_attempts = connection_error_retry_attempts
@@ -602,6 +617,7 @@ async def _determine_nodes(
602617
self.nodes_manager.get_node_from_slot(
603618
await self._determine_slot(command, *args),
604619
self.read_from_replicas and command in READ_COMMANDS,
620+
self.load_balancing_strategy if command in READ_COMMANDS else None,
605621
)
606622
]
607623

@@ -782,7 +798,11 @@ async def _execute_command(
782798
# refresh the target node
783799
slot = await self._determine_slot(*args)
784800
target_node = self.nodes_manager.get_node_from_slot(
785-
slot, self.read_from_replicas and args[0] in READ_COMMANDS
801+
slot,
802+
self.read_from_replicas and args[0] in READ_COMMANDS,
803+
self.load_balancing_strategy
804+
if args[0] in READ_COMMANDS
805+
else None,
786806
)
787807
moved = False
788808

@@ -1183,9 +1203,7 @@ def get_node(
11831203
return self.nodes_cache.get(node_name)
11841204
else:
11851205
raise DataError(
1186-
"get_node requires one of the following: "
1187-
"1. node name "
1188-
"2. host and port"
1206+
"get_node requires one of the following: 1. node name 2. host and port"
11891207
)
11901208

11911209
def set_nodes(
@@ -1245,17 +1263,23 @@ def _update_moved_slots(self) -> None:
12451263
self._moved_exception = None
12461264

12471265
def get_node_from_slot(
1248-
self, slot: int, read_from_replicas: bool = False
1266+
self,
1267+
slot: int,
1268+
read_from_replicas: bool = False,
1269+
load_balancing_strategy=None,
12491270
) -> "ClusterNode":
12501271
if self._moved_exception:
12511272
self._update_moved_slots()
12521273

1274+
if read_from_replicas is True and load_balancing_strategy is None:
1275+
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1276+
12531277
try:
1254-
if read_from_replicas:
1255-
# get the server index in a Round-Robin manner
1278+
if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1279+
# get the server index using the strategy defined in load_balancing_strategy
12561280
primary_name = self.slots_cache[slot][0].name
12571281
node_idx = self.read_load_balancer.get_server_index(
1258-
primary_name, len(self.slots_cache[slot])
1282+
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
12591283
)
12601284
return self.slots_cache[slot][node_idx]
12611285
return self.slots_cache[slot][0]
@@ -1367,7 +1391,7 @@ async def initialize(self) -> None:
13671391
if len(disagreements) > 5:
13681392
raise RedisClusterException(
13691393
f"startup_nodes could not agree on a valid "
1370-
f'slots cache: {", ".join(disagreements)}'
1394+
f"slots cache: {', '.join(disagreements)}"
13711395
)
13721396

13731397
# Validate if all slots are covered or if we should try next startup node

redis/cluster.py

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import threading
55
import time
66
from collections import OrderedDict
7+
from enum import Enum
78
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
89

910
from redis._parsers import CommandsParser, Encoder
@@ -505,6 +506,11 @@ class initializer. In the case of conflicting arguments, querystring
505506
"""
506507
return cls(url=url, **kwargs)
507508

509+
@deprecated_args(
510+
args_to_warn=["read_from_replicas"],
511+
reason="Please configure the 'load_balancing_strategy' instead",
512+
version="5.0.3",
513+
)
508514
def __init__(
509515
self,
510516
host: Optional[str] = None,
@@ -515,6 +521,7 @@ def __init__(
515521
require_full_coverage: bool = False,
516522
reinitialize_steps: int = 5,
517523
read_from_replicas: bool = False,
524+
load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
518525
dynamic_startup_nodes: bool = True,
519526
url: Optional[str] = None,
520527
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
@@ -543,11 +550,16 @@ def __init__(
543550
cluster client. If not all slots are covered, RedisClusterException
544551
will be thrown.
545552
:param read_from_replicas:
553+
@deprecated - please use load_balancing_strategy instead
546554
Enable read from replicas in READONLY mode. You can read possibly
547555
stale data.
548556
When set to true, read commands will be assigned between the
549557
primary and its replications in a Round-Robin manner.
550-
:param dynamic_startup_nodes:
558+
:param load_balancing_strategy:
559+
Enable read from replicas in READONLY mode and defines the load balancing
560+
strategy that will be used for cluster node selection.
561+
The data read from replicas is eventually consistent with the data in primary nodes.
562+
:param dynamic_startup_nodes:
551563
Set the RedisCluster's startup nodes to all of the discovered nodes.
552564
If true (default value), the cluster's discovered nodes will be used to
553565
determine the cluster nodes-slots mapping in the next topology refresh.
@@ -652,6 +664,7 @@ def __init__(
652664
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
653665
self.node_flags = self.__class__.NODE_FLAGS.copy()
654666
self.read_from_replicas = read_from_replicas
667+
self.load_balancing_strategy = load_balancing_strategy
655668
self.reinitialize_counter = 0
656669
self.reinitialize_steps = reinitialize_steps
657670
if event_dispatcher is None:
@@ -704,7 +717,7 @@ def on_connect(self, connection):
704717
connection.set_parser(ClusterParser)
705718
connection.on_connect()
706719

707-
if self.read_from_replicas:
720+
if self.read_from_replicas or self.load_balancing_strategy:
708721
# Sending READONLY command to server to configure connection as
709722
# readonly. Since each cluster node may change its server type due
710723
# to a failover, we should establish a READONLY connection
@@ -831,6 +844,7 @@ def pipeline(self, transaction=None, shard_hint=None):
831844
cluster_response_callbacks=self.cluster_response_callbacks,
832845
cluster_error_retry_attempts=self.cluster_error_retry_attempts,
833846
read_from_replicas=self.read_from_replicas,
847+
load_balancing_strategy=self.load_balancing_strategy,
834848
reinitialize_steps=self.reinitialize_steps,
835849
lock=self._lock,
836850
)
@@ -948,7 +962,9 @@ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
948962
# get the node that holds the key's slot
949963
slot = self.determine_slot(*args)
950964
node = self.nodes_manager.get_node_from_slot(
951-
slot, self.read_from_replicas and command in READ_COMMANDS
965+
slot,
966+
self.read_from_replicas and command in READ_COMMANDS,
967+
self.load_balancing_strategy if command in READ_COMMANDS else None,
952968
)
953969
return [node]
954970

@@ -1172,7 +1188,11 @@ def _execute_command(self, target_node, *args, **kwargs):
11721188
# refresh the target node
11731189
slot = self.determine_slot(*args)
11741190
target_node = self.nodes_manager.get_node_from_slot(
1175-
slot, self.read_from_replicas and command in READ_COMMANDS
1191+
slot,
1192+
self.read_from_replicas and command in READ_COMMANDS,
1193+
self.load_balancing_strategy
1194+
if command in READ_COMMANDS
1195+
else None,
11761196
)
11771197
moved = False
11781198

@@ -1327,6 +1347,12 @@ def __del__(self):
13271347
self.redis_connection.close()
13281348

13291349

1350+
class LoadBalancingStrategy(Enum):
1351+
ROUND_ROBIN = "round_robin"
1352+
ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1353+
RANDOM_REPLICA = "random_replica"
1354+
1355+
13301356
class LoadBalancer:
13311357
"""
13321358
Round-Robin Load Balancing
@@ -1336,15 +1362,38 @@ def __init__(self, start_index: int = 0) -> None:
13361362
self.primary_to_idx = {}
13371363
self.start_index = start_index
13381364

1339-
def get_server_index(self, primary: str, list_size: int) -> int:
1340-
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1341-
# Update the index
1342-
self.primary_to_idx[primary] = (server_index + 1) % list_size
1343-
return server_index
1365+
def get_server_index(
1366+
self,
1367+
primary: str,
1368+
list_size: int,
1369+
load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1370+
) -> int:
1371+
if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1372+
return self._get_random_replica_index(list_size)
1373+
else:
1374+
return self._get_round_robin_index(
1375+
primary,
1376+
list_size,
1377+
load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1378+
)
13441379

13451380
def reset(self) -> None:
13461381
self.primary_to_idx.clear()
13471382

1383+
def _get_random_replica_index(self, list_size: int) -> int:
1384+
return random.randint(1, list_size - 1)
1385+
1386+
def _get_round_robin_index(
1387+
self, primary: str, list_size: int, replicas_only: bool
1388+
) -> int:
1389+
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1390+
if replicas_only and server_index == 0:
1391+
# skip the primary node index
1392+
server_index = 1
1393+
# Update the index for the next round
1394+
self.primary_to_idx[primary] = (server_index + 1) % list_size
1395+
return server_index
1396+
13481397

13491398
class NodesManager:
13501399
def __init__(
@@ -1448,7 +1497,21 @@ def _update_moved_slots(self):
14481497
# Reset moved_exception
14491498
self._moved_exception = None
14501499

1451-
def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
1500+
@deprecated_args(
1501+
args_to_warn=["server_type"],
1502+
reason=(
1503+
"In case you need select some load balancing strategy "
1504+
"that will use replicas, please set it through 'load_balancing_strategy'"
1505+
),
1506+
version="5.0.3",
1507+
)
1508+
def get_node_from_slot(
1509+
self,
1510+
slot,
1511+
read_from_replicas=False,
1512+
load_balancing_strategy=None,
1513+
server_type=None,
1514+
):
14521515
"""
14531516
Gets a node that servers this hash slot
14541517
"""
@@ -1463,11 +1526,14 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
14631526
f'"require_full_coverage={self._require_full_coverage}"'
14641527
)
14651528

1466-
if read_from_replicas is True:
1467-
# get the server index in a Round-Robin manner
1529+
if read_from_replicas is True and load_balancing_strategy is None:
1530+
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1531+
1532+
if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1533+
# get the server index using the strategy defined in load_balancing_strategy
14681534
primary_name = self.slots_cache[slot][0].name
14691535
node_idx = self.read_load_balancer.get_server_index(
1470-
primary_name, len(self.slots_cache[slot])
1536+
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
14711537
)
14721538
elif (
14731539
server_type is None
@@ -1750,7 +1816,7 @@ def __init__(
17501816
first command execution. The node will be determined by:
17511817
1. Hashing the channel name in the request to find its keyslot
17521818
2. Selecting a node that handles the keyslot: If read_from_replicas is
1753-
set to true, a replica can be selected.
1819+
set to true or load_balancing_strategy is set, a replica can be selected.
17541820
17551821
:type redis_cluster: RedisCluster
17561822
:type node: ClusterNode
@@ -1846,7 +1912,9 @@ def execute_command(self, *args):
18461912
channel = args[1]
18471913
slot = self.cluster.keyslot(channel)
18481914
node = self.cluster.nodes_manager.get_node_from_slot(
1849-
slot, self.cluster.read_from_replicas
1915+
slot,
1916+
self.cluster.read_from_replicas,
1917+
self.cluster.load_balancing_strategy,
18501918
)
18511919
else:
18521920
# Get a random node
@@ -1989,6 +2057,7 @@ def __init__(
19892057
cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
19902058
startup_nodes: Optional[List["ClusterNode"]] = None,
19912059
read_from_replicas: bool = False,
2060+
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
19922061
cluster_error_retry_attempts: int = 3,
19932062
reinitialize_steps: int = 5,
19942063
lock=None,
@@ -2004,6 +2073,7 @@ def __init__(
20042073
)
20052074
self.startup_nodes = startup_nodes if startup_nodes else []
20062075
self.read_from_replicas = read_from_replicas
2076+
self.load_balancing_strategy = load_balancing_strategy
20072077
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
20082078
self.cluster_response_callbacks = cluster_response_callbacks
20092079
self.cluster_error_retry_attempts = cluster_error_retry_attempts

0 commit comments

Comments
 (0)