Skip to content

Commit 7119770

Browse files
committed
Add aclose method to asyncio.RedisCluster
1 parent 8185e17 commit 7119770

File tree

2 files changed

+68
-32
lines changed

2 files changed

+68
-32
lines changed

redis/asyncio/cluster.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -395,27 +395,31 @@ async def initialize(self) -> "RedisCluster":
395395
)
396396
self._initialize = False
397397
except BaseException:
398-
await self.nodes_manager.close()
399-
await self.nodes_manager.close("startup_nodes")
398+
await self.nodes_manager.aclose()
399+
await self.nodes_manager.aclose("startup_nodes")
400400
raise
401401
return self
402402

403-
async def close(self) -> None:
403+
async def aclose(self) -> None:
404404
"""Close all connections & client if initialized."""
405405
if not self._initialize:
406406
if not self._lock:
407407
self._lock = asyncio.Lock()
408408
async with self._lock:
409409
if not self._initialize:
410410
self._initialize = True
411-
await self.nodes_manager.close()
412-
await self.nodes_manager.close("startup_nodes")
411+
await self.nodes_manager.aclose()
412+
await self.nodes_manager.aclose("startup_nodes")
413+
414+
async def close(self) -> None:
415+
"""alias for aclose() for backwards compatibility"""
416+
await self.aclose()
413417

414418
async def __aenter__(self) -> "RedisCluster":
415419
return await self.initialize()
416420

417421
async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
418-
await self.close()
422+
await self.aclose()
419423

420424
def __await__(self) -> Generator[Any, None, "RedisCluster"]:
421425
return self.initialize().__await__()
@@ -767,13 +771,13 @@ async def _execute_command(
767771
self.nodes_manager.startup_nodes.pop(target_node.name, None)
768772
# Hard force of reinitialize of the node/slots setup
769773
# and try again with the new setup
770-
await self.close()
774+
await self.aclose()
771775
raise
772776
except ClusterDownError:
773777
# ClusterDownError can occur during a failover and to get
774778
# self-healed, we will try to reinitialize the cluster layout
775779
# and retry executing the command
776-
await self.close()
780+
await self.aclose()
777781
await asyncio.sleep(0.25)
778782
raise
779783
except MovedError as e:
@@ -790,7 +794,7 @@ async def _execute_command(
790794
self.reinitialize_steps
791795
and self.reinitialize_counter % self.reinitialize_steps == 0
792796
):
793-
await self.close()
797+
await self.aclose()
794798
# Reset the counter
795799
self.reinitialize_counter = 0
796800
else:
@@ -1323,7 +1327,7 @@ async def initialize(self) -> None:
13231327
# If initialize was called after a MovedError, clear it
13241328
self._moved_exception = None
13251329

1326-
async def close(self, attr: str = "nodes_cache") -> None:
1330+
async def aclose(self, attr: str = "nodes_cache") -> None:
13271331
self.default_node = None
13281332
await asyncio.gather(
13291333
*(
@@ -1471,7 +1475,7 @@ async def execute(
14711475
if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
14721476
# Try again with the new cluster setup.
14731477
exception = e
1474-
await self._client.close()
1478+
await self._client.aclose()
14751479
await asyncio.sleep(0.25)
14761480
else:
14771481
# All other errors should be raised.

tests/test_asyncio/test_cluster.py

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
)
3939

4040
from ..ssl_utils import get_ssl_filename
41-
from .compat import mock
41+
from .compat import aclosing, mock
4242

4343
pytestmark = pytest.mark.onlycluster
4444

@@ -270,7 +270,39 @@ async def test_host_port_startup_node(self) -> None:
270270
cluster = await get_mocked_redis_client(host=default_host, port=default_port)
271271
assert cluster.get_node(host=default_host, port=default_port) is not None
272272

273-
await cluster.close()
273+
await cluster.aclose()
274+
275+
async def test_aclosing(self) -> None:
276+
cluster = await get_mocked_redis_client(host=default_host, port=default_port)
277+
called = 1
278+
279+
async def mock_aclose(_):
280+
nonlocal called
281+
called += 1
282+
283+
with mock.patch.object(cluster, "aclose", mock_aclose):
284+
async with aclosing(cluster):
285+
pass
286+
assert called == 1
287+
await cluster.aclose()
288+
289+
async def test_close_is_aclose(self) -> None:
290+
"""
291+
Test that it is possible to use host & port arguments as startup node
292+
args
293+
"""
294+
cluster = await get_mocked_redis_client(host=default_host, port=default_port)
295+
296+
called = 1
297+
298+
async def mock_aclose(_):
299+
nonlocal called
300+
called += 1
301+
302+
with mock.patch.object(cluster, "aclose", mock_aclose):
303+
await cluster.close()
304+
assert called == 1
305+
await cluster.aclose()
274306

275307
async def test_startup_nodes(self) -> None:
276308
"""
@@ -289,7 +321,7 @@ async def test_startup_nodes(self) -> None:
289321
and cluster.get_node(host=default_host, port=port_2) is not None
290322
)
291323

292-
await cluster.close()
324+
await cluster.aclose()
293325

294326
startup_node = ClusterNode("127.0.0.1", 16379)
295327
async with RedisCluster(startup_nodes=[startup_node], client_name="test") as rc:
@@ -417,7 +449,7 @@ async def read_response_mocked(*args: Any, **kwargs: Any) -> None:
417449
)
418450
)
419451

420-
await rc.close()
452+
await rc.aclose()
421453

422454
async def test_execute_command_errors(self, r: RedisCluster) -> None:
423455
"""
@@ -461,7 +493,7 @@ async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None
461493
conn = primary._free.pop()
462494
assert conn.read_response.called is not True
463495

464-
await r.close()
496+
await r.aclose()
465497

466498
async def test_execute_command_node_flag_all_nodes(self, r: RedisCluster) -> None:
467499
"""
@@ -690,7 +722,7 @@ def execute_command_mock_third(self, *args, **options):
690722
await read_cluster.get("foo")
691723
mocks["send_command"].assert_has_calls([mock.call("READONLY")])
692724

693-
await read_cluster.close()
725+
await read_cluster.aclose()
694726

695727
async def test_keyslot(self, r: RedisCluster) -> None:
696728
"""
@@ -762,7 +794,7 @@ def raise_error(target_node, *args, **kwargs):
762794
await rc.get("bar")
763795
assert execute_command.failed_calls == rc.cluster_error_retry_attempts
764796

765-
await rc.close()
797+
await rc.aclose()
766798

767799
async def test_set_default_node_success(self, r: RedisCluster) -> None:
768800
"""
@@ -843,7 +875,7 @@ async def test_can_run_concurrent_commands(self, request: FixtureRequest) -> Non
843875
*(rc.echo("i", target_nodes=RedisCluster.ALL_NODES) for i in range(100))
844876
)
845877
)
846-
await rc.close()
878+
await rc.aclose()
847879

848880
def test_replace_cluster_node(self, r: RedisCluster) -> None:
849881
prev_default_node = r.get_default_node()
@@ -901,7 +933,7 @@ def address_remap(address):
901933
assert await r.set("byte_string", b"giraffe")
902934
assert await r.get("byte_string") == b"giraffe"
903935
finally:
904-
await r.close()
936+
await r.aclose()
905937
finally:
906938
await asyncio.gather(*[p.aclose() for p in proxies])
907939

@@ -1002,7 +1034,7 @@ async def test_initialize_before_execute_multi_key_command(
10021034
url = request.config.getoption("--redis-url")
10031035
r = RedisCluster.from_url(url)
10041036
assert 0 == await r.exists("a", "b", "c")
1005-
await r.close()
1037+
await r.aclose()
10061038

10071039
@skip_if_redis_enterprise()
10081040
async def test_cluster_myid(self, r: RedisCluster) -> None:
@@ -1065,7 +1097,7 @@ async def test_cluster_delslots(self) -> None:
10651097
assert node0._free.pop().read_response.called
10661098
assert node1._free.pop().read_response.called
10671099

1068-
await r.close()
1100+
await r.aclose()
10691101

10701102
@skip_if_server_version_lt("7.0.0")
10711103
@skip_if_redis_enterprise()
@@ -1076,7 +1108,7 @@ async def test_cluster_delslotsrange(self):
10761108
await r.cluster_addslots(node, 1, 2, 3, 4, 5)
10771109
assert await r.cluster_delslotsrange(1, 5)
10781110
assert node._free.pop().read_response.called
1079-
await r.close()
1111+
await r.aclose()
10801112

10811113
@skip_if_redis_enterprise()
10821114
async def test_cluster_failover(self, r: RedisCluster) -> None:
@@ -1286,7 +1318,7 @@ async def test_readonly(self) -> None:
12861318
for replica in r.get_replicas():
12871319
assert replica._free.pop().read_response.called
12881320

1289-
await r.close()
1321+
await r.aclose()
12901322

12911323
@skip_if_redis_enterprise()
12921324
async def test_readwrite(self) -> None:
@@ -1299,7 +1331,7 @@ async def test_readwrite(self) -> None:
12991331
for replica in r.get_replicas():
13001332
assert replica._free.pop().read_response.called
13011333

1302-
await r.close()
1334+
await r.aclose()
13031335

13041336
@skip_if_redis_enterprise()
13051337
async def test_bgsave(self, r: RedisCluster) -> None:
@@ -1524,7 +1556,7 @@ async def test_client_kill(
15241556
]
15251557
assert len(clients) == 1
15261558
assert clients[0].get("name") == "redis-py-c1"
1527-
await r2.close()
1559+
await r2.aclose()
15281560

15291561
@skip_if_server_version_lt("2.6.0")
15301562
async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None:
@@ -2302,7 +2334,7 @@ async def test_acl_log(
23022334

23032335
await r.acl_deluser(username, target_nodes="primaries")
23042336

2305-
await user_client.close()
2337+
await user_client.aclose()
23062338

23072339

23082340
class TestNodesManager:
@@ -2359,7 +2391,7 @@ async def test_init_slots_cache_not_all_slots_covered(self) -> None:
23592391
cluster_slots=cluster_slots,
23602392
require_full_coverage=True,
23612393
)
2362-
await rc.close()
2394+
await rc.aclose()
23632395
assert str(ex.value).startswith(
23642396
"All slots are not covered after query all startup_nodes."
23652397
)
@@ -2385,7 +2417,7 @@ async def test_init_slots_cache_not_require_full_coverage_success(self) -> None:
23852417

23862418
assert 5460 not in rc.nodes_manager.slots_cache
23872419

2388-
await rc.close()
2420+
await rc.aclose()
23892421

23902422
async def test_init_slots_cache(self) -> None:
23912423
"""
@@ -2416,7 +2448,7 @@ async def test_init_slots_cache(self) -> None:
24162448

24172449
assert len(n_manager.nodes_cache) == 6
24182450

2419-
await rc.close()
2451+
await rc.aclose()
24202452

24212453
async def test_init_slots_cache_cluster_mode_disabled(self) -> None:
24222454
"""
@@ -2427,7 +2459,7 @@ async def test_init_slots_cache_cluster_mode_disabled(self) -> None:
24272459
rc = await get_mocked_redis_client(
24282460
host=default_host, port=default_port, cluster_enabled=False
24292461
)
2430-
await rc.close()
2462+
await rc.aclose()
24312463
assert "Cluster mode is not enabled on this node" in str(e.value)
24322464

24332465
async def test_empty_startup_nodes(self) -> None:
@@ -2514,7 +2546,7 @@ async def test_cluster_one_instance(self) -> None:
25142546
for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
25152547
assert n.slots_cache[i] == [n_node]
25162548

2517-
await rc.close()
2549+
await rc.aclose()
25182550

25192551
async def test_init_with_down_node(self) -> None:
25202552
"""

0 commit comments

Comments
 (0)