Skip to content

Commit eb7140c

Browse files
zakafzach-iee
authored andcommitted
release already acquired connections on ClusterPipeline, when get_connection raises an exception
1 parent 9f50357 commit eb7140c

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

redis/cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1999,6 +1999,8 @@ def _send_cluster_commands(
19991999
try:
20002000
connection = get_connection(redis_node, c.args)
20012001
except ConnectionError:
2002+
for n in nodes.values():
2003+
n.connection_pool.release(n.connection)
20022004
# Connection retries are being handled in the node's
20032005
# Retry object. Reinitialize the node -> slot table.
20042006
self.nodes_manager.initialize()

tests/test_cluster.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import pytest
1313

14+
import redis
1415
from redis import Redis
1516
from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
1617
from redis.cluster import (
@@ -3041,6 +3042,31 @@ def raise_ask_error():
30413042
assert ask_node.redis_connection.connection.read_response.called
30423043
assert res == ["MOCK_OK"]
30433044

3045+
def test_return_previously_acquired_connections(self, r):
3046+
# in order to ensure that a pipeline will make use of connections
3047+
# from different nodes
3048+
assert r.keyslot('a') != r.keyslot('b')
3049+
3050+
orig_func = redis.cluster.get_connection
3051+
with patch("redis.cluster.get_connection") as get_connection:
3052+
def raise_error(target_node, *args, **kwargs):
3053+
if get_connection.call_count == 2:
3054+
raise ConnectionError("mocked error")
3055+
else:
3056+
return orig_func(target_node, *args, **kwargs)
3057+
3058+
get_connection.side_effect = raise_error
3059+
3060+
r.pipeline().get('a').get('b').execute()
3061+
3062+
# there should have been two get_connections per execution and
3063+
# two executions due to exception raised in the first execution
3064+
assert get_connection.call_count == 4
3065+
for cluster_node in r.nodes_manager.nodes_cache.values():
3066+
connection_pool = cluster_node.redis_connection.connection_pool
3067+
num_of_conns = len(connection_pool._available_connections)
3068+
assert num_of_conns == connection_pool._created_connections
3069+
30443070
def test_empty_stack(self, r):
30453071
"""
30463072
If pipeline is executed with no commands it should

0 commit comments

Comments
 (0)