Skip to content

Commit 339735c

Browse files
committed
Added support for RedisCluster and multi-threaded test cases
1 parent a734303 commit 339735c

File tree

4 files changed

+358
-45
lines changed

4 files changed

+358
-45
lines changed

redis/cluster.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from redis._parsers import CommandsParser, Encoder
1212
from redis._parsers.helpers import parse_scan
1313
from redis.backoff import default_backoff
14-
from redis.cache import CacheMixin
1514
from redis.client import CaseInsensitiveDict, PubSub, Redis
1615
from redis.commands import READ_COMMANDS, RedisClusterCommands
1716
from redis.commands.helpers import list_or_args
@@ -446,7 +445,7 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
446445
self.nodes_manager.default_node = random.choice(replicas)
447446

448447

449-
class RedisCluster(AbstractRedisCluster, RedisClusterCommands, CacheMixin):
448+
class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
450449
@classmethod
451450
def from_url(cls, url, **kwargs):
452451
"""
@@ -504,7 +503,10 @@ def __init__(
504503
dynamic_startup_nodes: bool = True,
505504
url: Optional[str] = None,
506505
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
507-
use_cache: Optional[bool] = False,
506+
use_cache: bool = False,
507+
cache: Optional[Cache] = None,
508+
cache_size: int = 128,
509+
cache_ttl: int = 300,
508510
**kwargs,
509511
):
510512
"""
@@ -628,6 +630,10 @@ def __init__(
628630
kwargs.get("encoding_errors", "strict"),
629631
kwargs.get("decode_responses", False),
630632
)
633+
protocol = kwargs.get("protocol", None)
634+
if use_cache and protocol not in [3, "3"]:
635+
raise RedisError("Client caching is only supported with RESP version 3")
636+
631637
self.cluster_error_retry_attempts = cluster_error_retry_attempts
632638
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
633639
self.node_flags = self.__class__.NODE_FLAGS.copy()
@@ -641,6 +647,9 @@ def __init__(
641647
dynamic_startup_nodes=dynamic_startup_nodes,
642648
address_remap=address_remap,
643649
use_cache=use_cache,
650+
cache=cache,
651+
cache_size=cache_size,
652+
cache_ttl=cache_ttl,
644653
**kwargs,
645654
)
646655

@@ -649,11 +658,6 @@ def __init__(
649658
)
650659
self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
651660

652-
protocol = kwargs.get("protocol", None)
653-
if use_cache and protocol not in [3, "3"]:
654-
raise RedisError("Client caching is only supported with RESP version 3")
655-
CacheMixin.__init__(self, use_cache, None)
656-
657661
self.commands_parser = CommandsParser(self)
658662
self._lock = threading.Lock()
659663

@@ -1057,8 +1061,6 @@ def _parse_target_nodes(self, target_nodes):
10571061
return nodes
10581062

10591063
def execute_command(self, *args, **options):
1060-
if self.use_cache:
1061-
return self.cached_call(self._execute_command, *args, **options)
10621064
return self._internal_execute_command(*args, **options)
10631065

10641066
def _internal_execute_command(self, *args, **kwargs):
@@ -1163,7 +1165,7 @@ def _execute_command(self, target_node, *args, **kwargs):
11631165
connection.send_command("ASKING")
11641166
redis_node.parse_response(connection, "ASKING", **kwargs)
11651167
asking = False
1166-
connection.send_command(*args)
1168+
connection.send_command(*args, **kwargs)
11671169
response = redis_node.parse_response(connection, command, **kwargs)
11681170
if command in self.cluster_response_callbacks:
11691171
response = self.cluster_response_callbacks[command](
@@ -1317,7 +1319,7 @@ def reset(self) -> None:
13171319
self.primary_to_idx.clear()
13181320

13191321

1320-
class NodesManager(CacheMixin):
1322+
class NodesManager():
13211323
def __init__(
13221324
self,
13231325
startup_nodes,
@@ -1327,8 +1329,10 @@ def __init__(
13271329
dynamic_startup_nodes=True,
13281330
connection_pool_class=ConnectionPool,
13291331
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
1330-
use_cache: Optional[bool] = False,
1332+
use_cache: bool = False,
13311333
cache: Optional[Cache] = None,
1334+
cache_size: int = 128,
1335+
cache_ttl: int = 300,
13321336
**kwargs,
13331337
):
13341338
self.nodes_cache = {}
@@ -1342,13 +1346,15 @@ def __init__(
13421346
self.connection_pool_class = connection_pool_class
13431347
self.address_remap = address_remap
13441348
self.use_cache = use_cache
1349+
self.cache = cache
1350+
self.cache_size = cache_size
1351+
self.cache_ttl = cache_ttl
13451352
self._moved_exception = None
13461353
self.connection_kwargs = kwargs
13471354
self.read_load_balancer = LoadBalancer()
13481355
if lock is None:
13491356
lock = threading.Lock()
13501357
self._lock = lock
1351-
CacheMixin.__init__(self, use_cache, None, cache)
13521358
self.initialize()
13531359

13541360
def get_node(self, host=None, port=None, node_name=None):
@@ -1486,9 +1492,21 @@ def create_redis_node(self, host, port, **kwargs):
14861492
# Create a redis node with a costumed connection pool
14871493
kwargs.update({"host": host})
14881494
kwargs.update({"port": port})
1489-
r = Redis(connection_pool=self.connection_pool_class(**kwargs), use_cache=self.use_cache, cache=self.cache)
1495+
kwargs.update({"use_cache": self.use_cache})
1496+
kwargs.update({"cache": self.cache})
1497+
kwargs.update({"cache_size": self.cache_size})
1498+
kwargs.update({"cache_ttl": self.cache_ttl})
1499+
r = Redis(connection_pool=self.connection_pool_class(**kwargs))
14901500
else:
1491-
r = Redis(host=host, port=port, use_cache=self.use_cache, cache=self.cache, **kwargs)
1501+
r = Redis(
1502+
host=host,
1503+
port=port,
1504+
use_cache=self.use_cache,
1505+
cache=self.cache,
1506+
cache_size=self.cache_size,
1507+
cache_ttl=self.cache_ttl,
1508+
**kwargs,
1509+
)
14921510
return r
14931511

14941512
def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):

redis/connection.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from abc import abstractmethod
99
from itertools import chain
1010
from queue import Empty, Full, LifoQueue
11-
from time import time
11+
from time import time, sleep
1212
from typing import Any, Callable, List, Optional, Type, Union
1313
from urllib.parse import parse_qs, unquote, urlparse
1414

@@ -756,6 +756,7 @@ def on_connect(self):
756756
self._conn.on_connect()
757757

758758
def disconnect(self, *args):
759+
self._cache.clear()
759760
self._conn.disconnect(*args)
760761

761762
def check_health(self):
@@ -1235,7 +1236,7 @@ def __init__(
12351236
self.max_connections = max_connections
12361237
self._cache = None
12371238
self._cache_conf = None
1238-
self._scheduler = None
1239+
self.scheduler = None
12391240

12401241
if connection_kwargs.get("use_cache"):
12411242
if connection_kwargs.get("protocol") not in [3, "3"]:
@@ -1249,9 +1250,9 @@ def __init__(
12491250
else:
12501251
self._cache = TTLCache(self.connection_kwargs["cache_size"], self.connection_kwargs["cache_ttl"])
12511252

1252-
# self.scheduler = BackgroundScheduler()
1253-
# self.scheduler.add_job(self._perform_health_check, "interval", seconds=2)
1254-
# self.scheduler.start()
1253+
self.scheduler = BackgroundScheduler()
1254+
self.scheduler.add_job(self._perform_health_check, "interval", seconds=2, id="cache_health_check")
1255+
self.scheduler.start()
12551256

12561257
connection_kwargs.pop("use_cache", None)
12571258
connection_kwargs.pop("cache_size", None)
@@ -1269,10 +1270,6 @@ def __init__(
12691270
self._fork_lock = threading.Lock()
12701271
self.reset()
12711272

1272-
def __del__(self):
1273-
if self._scheduler is not None:
1274-
self.scheduler.shutdown()
1275-
12761273
def __repr__(self) -> (str, str):
12771274
return (
12781275
f"<{type(self).__module__}.{type(self).__name__}"
@@ -1464,10 +1461,8 @@ def _perform_health_check(self) -> None:
14641461
with self._lock:
14651462
while self._available_connections:
14661463
conn = self._available_connections.pop()
1467-
self._in_use_connections.add(conn)
14681464
conn.send_command('PING')
14691465
conn.read_response()
1470-
self.release(conn)
14711466

14721467

14731468
class BlockingConnectionPool(ConnectionPool):

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from redis.retry import Retry
1717

1818
REDIS_INFO = {}
19-
default_redis_url = "redis://localhost:6379/0"
19+
default_redis_url = "redis://localhost:6372/0"
2020
default_protocol = "2"
2121
default_redismod_url = "redis://localhost:6479"
2222

0 commit comments

Comments
 (0)