Skip to content

Commit 101a28d

Browse files
zakafdvora-h
authored andcommitted
release already acquired connections on ClusterPipeline, when get_connection raises an exception (#3133)
Signed-off-by: zach.lee <[email protected]>
1 parent 3f23fff commit 101a28d

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

redis/cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2130,6 +2130,8 @@ def _send_cluster_commands(
21302130
try:
21312131
connection = get_connection(redis_node, c.args)
21322132
except ConnectionError:
2133+
for n in nodes.values():
2134+
n.connection_pool.release(n.connection)
21332135
# Connection retries are being handled in the node's
21342136
# Retry object. Reinitialize the node -> slot table.
21352137
self.nodes_manager.initialize()

tests/test_cluster.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from unittest.mock import DEFAULT, Mock, call, patch
1111

1212
import pytest
13+
import redis
1314
from redis import Redis
1415
from redis._parsers import CommandsParser
1516
from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
@@ -3247,6 +3248,31 @@ def raise_ask_error():
32473248
assert ask_node.redis_connection.connection.read_response.called
32483249
assert res == ["MOCK_OK"]
32493250

3251+
def test_return_previously_acquired_connections(self, r):
3252+
# in order to ensure that a pipeline will make use of connections
3253+
# from different nodes
3254+
assert r.keyslot("a") != r.keyslot("b")
3255+
3256+
orig_func = redis.cluster.get_connection
3257+
with patch("redis.cluster.get_connection") as get_connection:
3258+
3259+
def raise_error(target_node, *args, **kwargs):
3260+
if get_connection.call_count == 2:
3261+
raise ConnectionError("mocked error")
3262+
else:
3263+
return orig_func(target_node, *args, **kwargs)
3264+
3265+
get_connection.side_effect = raise_error
3266+
3267+
r.pipeline().get("a").get("b").execute()
3268+
3269+
# 4 = 2 get_connections per execution * 2 executions
3270+
assert get_connection.call_count == 4
3271+
for cluster_node in r.nodes_manager.nodes_cache.values():
3272+
connection_pool = cluster_node.redis_connection.connection_pool
3273+
num_of_conns = len(connection_pool._available_connections)
3274+
assert num_of_conns == connection_pool._created_connections
3275+
32503276
def test_empty_stack(self, r):
32513277
"""
32523278
If pipeline is executed with no commands it should

0 commit comments

Comments
 (0)