Skip to content

Commit 7098ea4

Browse files
utkarshgupta137dvora-h
authored andcommitted
async_cluster: fix concurrent pipeline (#2280)
- each pipeline should create separate stacks for each node
1 parent de903f2 commit 7098ea4

File tree

2 files changed

+17
-9
lines changed

2 files changed

+17
-9
lines changed

redis/asyncio/cluster.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,6 @@ class ClusterNode:
755755
"""
756756

757757
__slots__ = (
758-
"_command_stack",
759758
"_connections",
760759
"_free",
761760
"connection_class",
@@ -796,7 +795,6 @@ def __init__(
796795

797796
self._connections: List[Connection] = []
798797
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
799-
self._command_stack: List["PipelineCommand"] = []
800798

801799
def __repr__(self) -> str:
802800
return (
@@ -887,18 +885,18 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
887885
# Release connection
888886
self._free.append(connection)
889887

890-
async def execute_pipeline(self) -> bool:
888+
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
891889
# Acquire connection
892890
connection = self.acquire_connection()
893891

894892
# Execute command
895893
await connection.send_packed_command(
896-
connection.pack_commands(cmd.args for cmd in self._command_stack), False
894+
connection.pack_commands(cmd.args for cmd in commands), False
897895
)
898896

899897
# Read responses
900898
ret = False
901-
for cmd in self._command_stack:
899+
for cmd in commands:
902900
try:
903901
cmd.result = await self.parse_response(
904902
connection, cmd.args[0], **cmd.kwargs
@@ -1365,12 +1363,14 @@ async def _execute(
13651363

13661364
node = target_nodes[0]
13671365
if node.name not in nodes:
1368-
nodes[node.name] = node
1369-
node._command_stack = []
1370-
node._command_stack.append(cmd)
1366+
nodes[node.name] = (node, [])
1367+
nodes[node.name][1].append(cmd)
13711368

13721369
errors = await asyncio.gather(
1373-
*(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
1370+
*(
1371+
asyncio.ensure_future(node[0].execute_pipeline(node[1]))
1372+
for node in nodes.values()
1373+
)
13741374
)
13751375

13761376
if any(errors):

tests/test_asyncio/test_cluster.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2476,3 +2476,11 @@ async def test_readonly_pipeline_from_readonly_client(
24762476
executed_on_replica = True
24772477
break
24782478
assert executed_on_replica
2479+
2480+
async def test_can_run_concurrent_pipelines(self, r: RedisCluster) -> None:
2481+
"""Test that the pipeline can be used concurrently."""
2482+
await asyncio.gather(
2483+
*(self.test_redis_cluster_pipeline(r) for i in range(100)),
2484+
*(self.test_multi_key_operation_with_a_single_slot(r) for i in range(100)),
2485+
*(self.test_multi_key_operation_with_multi_slots(r) for i in range(100)),
2486+
)

0 commit comments

Comments
 (0)