Skip to content

Commit ef20b0c

Browse files
committed
[GROW-3247] release connection even if an unexpected exception is thrown in cluster pipeline (#8)
* [GROW-3247] release connection even if an unexpected exception is thrown in cluster pipeline * [GROW-3247] fix style issue * unassign n.connection at every loop
1 parent 56b27c0 commit ef20b0c

File tree

2 files changed

+185
-146
lines changed

2 files changed

+185
-146
lines changed

redis/cluster.py

Lines changed: 162 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,158 +1995,174 @@ def _send_cluster_commands(
19951995
# build a list of node objects based on node names we need to
19961996
nodes = {}
19971997

1998-
# as we move through each command that still needs to be processed,
1999-
# we figure out the slot number that command maps to, then from
2000-
# the slot determine the node.
2001-
for c in attempt:
2002-
while True:
2003-
# refer to our internal node -> slot table that
2004-
# tells us where a given command should route to.
2005-
# (it might be possible we have a cached node that no longer
2006-
# exists in the cluster, which is why we do this in a loop)
2007-
passed_targets = c.options.pop("target_nodes", None)
2008-
if passed_targets and not self._is_nodes_flag(passed_targets):
2009-
target_nodes = self._parse_target_nodes(passed_targets)
2010-
else:
2011-
target_nodes = self._determine_nodes(
2012-
*c.args, node_flag=passed_targets
2013-
)
2014-
if not target_nodes:
1998+
try:
1999+
# as we move through each command that still needs to be processed,
2000+
# we figure out the slot number that command maps to, then from
2001+
# the slot determine the node.
2002+
for c in attempt:
2003+
while True:
2004+
# refer to our internal node -> slot table that
2005+
# tells us where a given command should route to.
2006+
# (it might be possible we have a cached node that no longer
2007+
# exists in the cluster, which is why we do this in a loop)
2008+
passed_targets = c.options.pop("target_nodes", None)
2009+
if passed_targets and not self._is_nodes_flag(passed_targets):
2010+
target_nodes = self._parse_target_nodes(passed_targets)
2011+
else:
2012+
target_nodes = self._determine_nodes(
2013+
*c.args, node_flag=passed_targets
2014+
)
2015+
if not target_nodes:
2016+
raise RedisClusterException(
2017+
f"No targets were found to execute {c.args} command on"
2018+
)
2019+
if len(target_nodes) > 1:
20152020
raise RedisClusterException(
2016-
f"No targets were found to execute {c.args} command on"
2021+
f"Too many targets for command {c.args}"
20172022
)
2018-
if len(target_nodes) > 1:
2019-
raise RedisClusterException(
2020-
f"Too many targets for command {c.args}"
2021-
)
20222023

2023-
node = target_nodes[0]
2024-
if node == self.get_default_node():
2025-
is_default_node = True
2024+
node = target_nodes[0]
2025+
if node == self.get_default_node():
2026+
is_default_node = True
20262027

2027-
# now that we know the name of the node
2028-
# ( it's just a string in the form of host:port )
2029-
# we can build a list of commands for each node.
2030-
node_name = node.name
2031-
if node_name not in nodes:
2032-
redis_node = self.get_redis_connection(node)
2028+
# now that we know the name of the node
2029+
# ( it's just a string in the form of host:port )
2030+
# we can build a list of commands for each node.
2031+
node_name = node.name
2032+
if node_name not in nodes:
2033+
redis_node = self.get_redis_connection(node)
2034+
try:
2035+
connection = get_connection(redis_node, c.args)
2036+
except (ConnectionError, TimeoutError) as e:
2037+
for n in nodes.values():
2038+
n.connection_pool.release(n.connection)
2039+
n.connection = None
2040+
nodes = {}
2041+
if self.retry and isinstance(
2042+
e, self.retry._supported_errors
2043+
):
2044+
backoff = self.retry._backoff.compute(attempts_count)
2045+
if backoff > 0:
2046+
time.sleep(backoff)
2047+
self.nodes_manager.initialize()
2048+
if is_default_node:
2049+
self.replace_default_node()
2050+
raise
2051+
nodes[node_name] = NodeCommands(
2052+
redis_node.parse_response,
2053+
redis_node.connection_pool,
2054+
connection,
2055+
)
2056+
nodes[node_name].append(c)
2057+
break
2058+
2059+
# send the commands in sequence.
2060+
# we write to all the open sockets for each node first,
2061+
# before reading anything
2062+
# this allows us to flush all the requests out across the
2063+
# network essentially in parallel
2064+
# so that we can read them all in parallel as they come back.
2065+
# we dont' multiplex on the sockets as they come available,
2066+
# but that shouldn't make too much difference.
2067+
node_commands = nodes.values()
2068+
for n in node_commands:
2069+
n.write()
2070+
2071+
for n in node_commands:
2072+
n.read()
2073+
2074+
# release all of the redis connections we allocated earlier
2075+
# back into the connection pool.
2076+
# we used to do this step as part of a try/finally block,
2077+
# but it is really dangerous to
2078+
# release connections back into the pool if for some
2079+
# reason the socket has data still left in it
2080+
# from a previous operation. The write and
2081+
# read operations already have try/catch around them for
2082+
# all known types of errors including connection
2083+
# and socket level errors.
2084+
# So if we hit an exception, something really bad
2085+
# happened and putting any oF
2086+
# these connections back into the pool is a very bad idea.
2087+
# the socket might have unread buffer still sitting in it,
2088+
# and then the next time we read from it we pass the
2089+
# buffered result back from a previous command and
2090+
# every single request after to that connection will always get
2091+
# a mismatched result.
2092+
for n in nodes.values():
2093+
n.connection_pool.release(n.connection)
2094+
n.connection = None
2095+
nodes = {}
2096+
2097+
# if the response isn't an exception it is a
2098+
# valid response from the node
2099+
# we're all done with that command, YAY!
2100+
# if we have more commands to attempt, we've run into problems.
2101+
# collect all the commands we are allowed to retry.
2102+
# (MOVED, ASK, or connection errors or timeout errors)
2103+
attempt = sorted(
2104+
(
2105+
c
2106+
for c in attempt
2107+
if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2108+
),
2109+
key=lambda x: x.position,
2110+
)
2111+
if attempt and allow_redirections:
2112+
# RETRY MAGIC HAPPENS HERE!
2113+
# send these remaing commands one at a time using `execute_command`
2114+
# in the main client. This keeps our retry logic
2115+
# in one place mostly,
2116+
# and allows us to be more confident in correctness of behavior.
2117+
# at this point any speed gains from pipelining have been lost
2118+
# anyway, so we might as well make the best
2119+
# attempt to get the correct behavior.
2120+
#
2121+
# The client command will handle retries for each
2122+
# individual command sequentially as we pass each
2123+
# one into `execute_command`. Any exceptions
2124+
# that bubble out should only appear once all
2125+
# retries have been exhausted.
2126+
#
2127+
# If a lot of commands have failed, we'll be setting the
2128+
# flag to rebuild the slots table from scratch.
2129+
# So MOVED errors should correct themselves fairly quickly.
2130+
self.reinitialize_counter += 1
2131+
if self._should_reinitialized():
2132+
self.nodes_manager.initialize()
2133+
if is_default_node:
2134+
self.replace_default_node()
2135+
for c in attempt:
20332136
try:
2034-
connection = get_connection(redis_node, c.args)
2035-
except (ConnectionError, TimeoutError) as e:
2036-
for n in nodes.values():
2037-
n.connection_pool.release(n.connection)
2038-
if self.retry and isinstance(e, self.retry._supported_errors):
2039-
backoff = self.retry._backoff.compute(attempts_count)
2040-
if backoff > 0:
2041-
time.sleep(backoff)
2042-
self.nodes_manager.initialize()
2043-
if is_default_node:
2044-
self.replace_default_node()
2045-
raise
2046-
nodes[node_name] = NodeCommands(
2047-
redis_node.parse_response,
2048-
redis_node.connection_pool,
2049-
connection,
2050-
)
2051-
nodes[node_name].append(c)
2052-
break
2053-
2054-
# send the commands in sequence.
2055-
# we write to all the open sockets for each node first,
2056-
# before reading anything
2057-
# this allows us to flush all the requests out across the
2058-
# network essentially in parallel
2059-
# so that we can read them all in parallel as they come back.
2060-
# we dont' multiplex on the sockets as they come available,
2061-
# but that shouldn't make too much difference.
2062-
node_commands = nodes.values()
2063-
for n in node_commands:
2064-
n.write()
2065-
2066-
for n in node_commands:
2067-
n.read()
2068-
2069-
# release all of the redis connections we allocated earlier
2070-
# back into the connection pool.
2071-
# we used to do this step as part of a try/finally block,
2072-
# but it is really dangerous to
2073-
# release connections back into the pool if for some
2074-
# reason the socket has data still left in it
2075-
# from a previous operation. The write and
2076-
# read operations already have try/catch around them for
2077-
# all known types of errors including connection
2078-
# and socket level errors.
2079-
# So if we hit an exception, something really bad
2080-
# happened and putting any oF
2081-
# these connections back into the pool is a very bad idea.
2082-
# the socket might have unread buffer still sitting in it,
2083-
# and then the next time we read from it we pass the
2084-
# buffered result back from a previous command and
2085-
# every single request after to that connection will always get
2086-
# a mismatched result.
2087-
for n in nodes.values():
2088-
n.connection_pool.release(n.connection)
2089-
2090-
# if the response isn't an exception it is a
2091-
# valid response from the node
2092-
# we're all done with that command, YAY!
2093-
# if we have more commands to attempt, we've run into problems.
2094-
# collect all the commands we are allowed to retry.
2095-
# (MOVED, ASK, or connection errors or timeout errors)
2096-
attempt = sorted(
2097-
(
2098-
c
2099-
for c in attempt
2100-
if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
2101-
),
2102-
key=lambda x: x.position,
2103-
)
2104-
if attempt and allow_redirections:
2105-
# RETRY MAGIC HAPPENS HERE!
2106-
# send these remaing commands one at a time using `execute_command`
2107-
# in the main client. This keeps our retry logic
2108-
# in one place mostly,
2109-
# and allows us to be more confident in correctness of behavior.
2110-
# at this point any speed gains from pipelining have been lost
2111-
# anyway, so we might as well make the best
2112-
# attempt to get the correct behavior.
2113-
#
2114-
# The client command will handle retries for each
2115-
# individual command sequentially as we pass each
2116-
# one into `execute_command`. Any exceptions
2117-
# that bubble out should only appear once all
2118-
# retries have been exhausted.
2119-
#
2120-
# If a lot of commands have failed, we'll be setting the
2121-
# flag to rebuild the slots table from scratch.
2122-
# So MOVED errors should correct themselves fairly quickly.
2123-
self.reinitialize_counter += 1
2124-
if self._should_reinitialized():
2125-
self.nodes_manager.initialize()
2126-
if is_default_node:
2127-
self.replace_default_node()
2128-
for c in attempt:
2129-
try:
2130-
# send each command individually like we
2131-
# do in the main client.
2132-
c.result = super().execute_command(*c.args, **c.options)
2133-
except RedisError as e:
2134-
c.result = e
2135-
2136-
# turn the response back into a simple flat array that corresponds
2137-
# to the sequence of commands issued in the stack in pipeline.execute()
2138-
response = []
2139-
for c in sorted(stack, key=lambda x: x.position):
2140-
if c.args[0] in self.cluster_response_callbacks:
2141-
c.result = self.cluster_response_callbacks[c.args[0]](
2142-
c.result, **c.options
2143-
)
2144-
response.append(c.result)
2145-
2146-
if raise_on_error:
2147-
self.raise_first_error(stack)
2137+
# send each command individually like we
2138+
# do in the main client.
2139+
c.result = super().execute_command(*c.args, **c.options)
2140+
except RedisError as e:
2141+
c.result = e
21482142

2149-
return response
2143+
# turn the response back into a simple flat array that corresponds
2144+
# to the sequence of commands issued in the stack in pipeline.execute()
2145+
response = []
2146+
for c in sorted(stack, key=lambda x: x.position):
2147+
if c.args[0] in self.cluster_response_callbacks:
2148+
c.result = self.cluster_response_callbacks[c.args[0]](
2149+
c.result, **c.options
2150+
)
2151+
response.append(c.result)
2152+
2153+
if raise_on_error:
2154+
self.raise_first_error(stack)
2155+
2156+
return response
2157+
except BaseException:
2158+
# if nodes is not empty, a problem must have occurred
2159+
# since we cant guarantee the state of the connections,
2160+
# disconnect before returning it to the connection pool
2161+
for n in nodes.values():
2162+
if n.connection:
2163+
n.connection.disconnect()
2164+
n.connection_pool.release(n.connection)
2165+
raise
21502166

21512167
def _fail_on_redirect(self, allow_redirections):
21522168
""" """

tests/test_cluster.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
REPLICA,
2929
ClusterNode,
3030
LoadBalancer,
31+
NodeCommands,
3132
NodesManager,
3233
RedisCluster,
3334
get_node_name,
@@ -2790,6 +2791,28 @@ class TestClusterPipeline:
27902791
Tests for the ClusterPipeline class
27912792
"""
27922793

2794+
@pytest.mark.parametrize("function", ["write", "read"])
2795+
def test_connection_release_with_unexpected_error_in_node_commands(
2796+
self, r, function
2797+
):
2798+
"""
2799+
Test that connection is released to the pool, even with an unexpected error
2800+
"""
2801+
with patch.object(NodeCommands, function) as m:
2802+
2803+
def raise_error():
2804+
raise Exception("unexpected error")
2805+
2806+
m.side_effect = raise_error
2807+
2808+
with pytest.raises(Exception, match="unexpected error"):
2809+
r.pipeline().get("a").execute()
2810+
2811+
for cluster_node in r.nodes_manager.nodes_cache.values():
2812+
connection_pool = cluster_node.redis_connection.connection_pool
2813+
num_of_conns = len(connection_pool._available_connections)
2814+
assert num_of_conns == connection_pool._created_connections
2815+
27932816
def test_blocked_methods(self, r):
27942817
"""
27952818
Currently some method calls on a Cluster pipeline

0 commit comments

Comments
 (0)