Skip to content

Commit 33f656e

Browse files
committed
Added caching support for Sentinel
1 parent eaeef12 commit 33f656e

File tree

4 files changed

+102
-15
lines changed

4 files changed

+102
-15
lines changed

redis/connection.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,8 @@ def __init__(self, conn: ConnectionInterface, cache: Cache, conf: CacheConfigura
729729
self.pid = os.getpid()
730730
self._conn = conn
731731
self.retry = self._conn.retry
732+
self.host = self._conn.host
733+
self.port = self._conn.port
732734
self._cache = cache
733735
self._conf = conf
734736
self._current_command_hash = None
@@ -770,7 +772,7 @@ def send_packed_command(self, command, check_health=True):
770772
def send_command(self, *args, **kwargs):
771773
self._process_pending_invalidations()
772774

773-
# If command is write command or not allowed to cache skip it.
775+
# If command is write command or not allowed to cache, transfer control to the actual connection.
774776
if not self._conf.is_allowed_to_cache(args[0]):
775777
self._current_command_hash = None
776778
self._current_command_keys = None

redis/sentinel.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ def __init__(
229229
sentinels,
230230
min_other_sentinels=0,
231231
sentinel_kwargs=None,
232+
force_master_ip=None,
232233
**connection_kwargs,
233234
):
234235
# if sentinel_kwargs isn't defined, use the socket_* options from
@@ -245,6 +246,7 @@ def __init__(
245246
]
246247
self.min_other_sentinels = min_other_sentinels
247248
self.connection_kwargs = connection_kwargs
249+
self._force_master_ip = force_master_ip
248250

249251
def execute_command(self, *args, **kwargs):
250252
"""
@@ -304,7 +306,9 @@ def discover_master(self, service_name):
304306
sentinel,
305307
self.sentinels[0],
306308
)
307-
return state["ip"], state["port"]
309+
310+
ip = self._force_master_ip if self._force_master_ip is not None else state["ip"]
311+
return ip, state["port"]
308312

309313
error_info = ""
310314
if len(collected_errors) > 0:

tests/conftest.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -410,31 +410,40 @@ def sslclient(request):
410410

411411

412412
@pytest.fixture()
413-
def sentinel_setup(cache, request):
413+
def sentinel_setup(request):
414414
sentinel_ips = request.config.getoption("--sentinels")
415415
sentinel_endpoints = [
416416
(ip.strip(), int(port.strip()))
417417
for ip, port in (endpoint.split(":") for endpoint in sentinel_ips.split(","))
418418
]
419419
kwargs = request.param.get("kwargs", {}) if hasattr(request, "param") else {}
420+
use_cache = request.param.get("use_cache", False)
421+
cache = request.param.get("cache", None)
422+
cache_size = request.param.get("cache_size", 128)
423+
cache_ttl = request.param.get("cache_ttl", 300)
424+
force_master_ip = request.param.get("force_master_ip", None)
420425
sentinel = Sentinel(
421426
sentinel_endpoints,
427+
force_master_ip=force_master_ip,
422428
socket_timeout=0.1,
423-
use_cache=cache,
429+
use_cache=use_cache,
424430
cache=cache,
431+
cache_ttl=cache_ttl,
432+
cache_size=cache_size,
425433
protocol=3,
426434
**kwargs,
427435
)
428-
yield sentinel
436+
yield sentinel, cache
429437
for s in sentinel.sentinels:
430438
s.close()
431439

432440

433441
@pytest.fixture()
434442
def master(request, sentinel_setup):
435443
master_service = request.config.getoption("--master-service")
436-
master = sentinel_setup.master_for(master_service)
437-
yield master
444+
sentinel, cache = sentinel_setup
445+
master = sentinel.master_for(master_service)
446+
yield master, cache
438447
master.close()
439448

440449

tests/test_cache.py

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from cachetools import TTLCache, LRUCache, LFUCache
66

77
import redis
8-
from redis import Redis, RedisCluster
98
from redis.utils import HIREDIS_AVAILABLE
109
from tests.conftest import _get_client
1110

@@ -113,8 +112,6 @@ def test_health_check_invalidate_cache_multithreaded(self, r, r2, cache):
113112
threading.Thread(target=r2.set, args=("bar", "bar")).start()
114113
# Wait for health check
115114
time.sleep(2)
116-
# Trigger object destructor to shutdown health check thread
117-
del r
118115
# Make sure that value was invalidated
119116
assert cache.get(("GET", "foo")) is None
120117
assert cache.get(("GET", "bar")) is None
@@ -492,17 +489,92 @@ def test_cache_flushed_on_server_flush(self, r, cache):
492489
@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only")
493490
@pytest.mark.onlynoncluster
494491
class TestSentinelCache:
495-
def test_get_from_cache(self, cache, master):
492+
@pytest.mark.parametrize(
493+
"sentinel_setup",
494+
[{"cache": LRUCache(maxsize=128), "use_cache": True, "force_master_ip": "localhost"}],
495+
indirect=True,
496+
)
497+
@pytest.mark.onlynoncluster
498+
def test_get_from_cache(self, master, cache):
499+
master, cache = master
496500
master.set("foo", "bar")
497501
# get key from redis and save in local cache
498502
assert master.get("foo") == b"bar"
499503
# get key from local cache
500504
assert cache.get(("GET", "foo")) == b"bar"
501505
# change key in redis (cause invalidation)
502506
master.set("foo", "barbar")
503-
# send any command to redis (process invalidation in background)
504-
master.ping()
505-
# the command is not in the local cache anymore
506-
assert cache.get(("GET", "foo")) is None
507507
# get key from redis
508508
assert master.get("foo") == b"barbar"
509+
# Make sure that new value was cached
510+
assert cache.get(("GET", "foo")) == b"barbar"
511+
512+
@pytest.mark.parametrize(
513+
"sentinel_setup",
514+
[{"cache": LRUCache(maxsize=128), "use_cache": True, "force_master_ip": "localhost"}],
515+
indirect=True,
516+
)
517+
@pytest.mark.onlynoncluster
518+
def test_get_from_cache_multithreaded(self, master, cache):
519+
master, cache = master
520+
# Running commands over two threads
521+
threading.Thread(target=set_get, args=(master, "foo", "bar")).start()
522+
threading.Thread(target=set_get, args=(master, "bar", "foo")).start()
523+
524+
# Wait for command execution to be finished
525+
time.sleep(0.1)
526+
527+
# Make sure that both values was cached.
528+
assert cache.get(("GET", "foo")) == b"bar"
529+
assert cache.get(("GET", "bar")) == b"foo"
530+
531+
# Running commands over two threads
532+
threading.Thread(target=set_get, args=(master, "foo", "baz")).start()
533+
threading.Thread(target=set_get, args=(master, "bar", "bar")).start()
534+
535+
# Wait for command execution to be finished
536+
time.sleep(0.1)
537+
538+
# Make sure that new values was cached.
539+
assert cache.get(("GET", "foo")) == b"baz"
540+
assert cache.get(("GET", "bar")) == b"bar"
541+
542+
@pytest.mark.parametrize(
543+
"sentinel_setup",
544+
[{"cache": LRUCache(maxsize=128), "use_cache": True, "force_master_ip": "localhost"}],
545+
indirect=True,
546+
)
547+
@pytest.mark.onlynoncluster
548+
def test_health_check_invalidate_cache(self, master, cache):
549+
master, cache = master
550+
# add key to redis
551+
master.set("foo", "bar")
552+
# get key from redis and save in local cache
553+
assert master.get("foo") == b"bar"
554+
# get key from local cache
555+
assert cache.get(("GET", "foo")) == b"bar"
556+
# change key in redis (cause invalidation)
557+
master.set("foo", "barbar")
558+
# Wait for health check
559+
time.sleep(2)
560+
# Make sure that value was invalidated
561+
assert cache.get(("GET", "foo")) is None
562+
563+
@pytest.mark.parametrize(
564+
"sentinel_setup",
565+
[{"cache": LRUCache(maxsize=128), "use_cache": True, "force_master_ip": "localhost"}],
566+
indirect=True,
567+
)
568+
@pytest.mark.onlynoncluster
569+
def test_cache_clears_on_disconnect(self, master, cache):
570+
master, cache = master
571+
# add key to redis
572+
master.set("foo", "bar")
573+
# get key from redis and save in local cache
574+
assert master.get("foo") == b"bar"
575+
# get key from local cache
576+
assert cache.get(("GET", "foo")) == b"bar"
577+
# Force disconnection
578+
master.connection_pool.get_connection('_').disconnect()
579+
# Make sure cache is empty
580+
assert cache.currsize == 0

0 commit comments

Comments
 (0)