Skip to content

Commit 3d5210c

Browse files
committed
Add regression tests and fixes for issue redis#1128
1 parent 7fc4c76 commit 3d5210c

File tree

6 files changed

+113
-18
lines changed

6 files changed

+113
-18
lines changed

redis/asyncio/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,9 @@ async def parse_response(self, block: bool = True, timeout: float = 0):
816816
await conn.connect()
817817

818818
read_timeout = None if block else timeout
819-
response = await self._execute(conn, conn.read_response, timeout=read_timeout)
819+
response = await self._execute(
820+
conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False
821+
)
820822

821823
if conn.health_check_interval and response == self.health_check_response:
822824
# ignore the health check message as user might not expect it

redis/asyncio/connection.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,11 @@ async def send_packed_command(
804804
raise ConnectionError(
805805
f"Error {err_no} while writing to socket. {errmsg}."
806806
) from e
807-
except Exception:
807+
except BaseException:
808+
# BaseExceptions can be raised when a socket send operation is not
809+
# finished, e.g. due to a timeout. Ideally, a caller could then re-try
810+
# to send un-sent data. However, the send_packed_command() API
811+
# does not support it so there is no point in keeping the connection open.
808812
await self.disconnect(nowait=True)
809813
raise
810814

@@ -827,7 +831,9 @@ async def can_read_destructive(self):
827831
async def read_response(
828832
self,
829833
disable_decoding: bool = False,
834+
*,
830835
timeout: Optional[float] = None,
836+
disconnect_on_error: bool = True,
831837
):
832838
"""Read the response from a previously sent command"""
833839
read_timeout = timeout if timeout is not None else self.socket_timeout
@@ -843,22 +849,24 @@ async def read_response(
843849
)
844850
except asyncio.TimeoutError:
845851
if timeout is not None:
846-
# user requested timeout, return None
852+
# user requested timeout, return None. Operation can be retried
847853
return None
848854
# it was a self.socket_timeout error.
849-
await self.disconnect(nowait=True)
855+
if disconnect_on_error:
856+
await self.disconnect(nowait=True)
850857
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
851858
except OSError as e:
852-
await self.disconnect(nowait=True)
859+
if disconnect_on_error:
860+
await self.disconnect(nowait=True)
853861
raise ConnectionError(
854862
f"Error while reading from {self.host}:{self.port} : {e.args}"
855863
)
856-
except asyncio.CancelledError:
857-
# need this check for 3.7, where CancelledError
858-
# is subclass of Exception, not BaseException
859-
raise
860-
except Exception:
861-
await self.disconnect(nowait=True)
864+
except BaseException:
865+
# Also by default close in case of BaseException. A lot of code
866+
# relies on this behaviour when doing Command/Response pairs.
867+
# See #1128.
868+
if disconnect_on_error:
869+
await self.disconnect(nowait=True)
862870
raise
863871

864872
if self.health_check_interval:

redis/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1526,7 +1526,7 @@ def try_read():
15261526
return None
15271527
else:
15281528
conn.connect()
1529-
return conn.read_response()
1529+
return conn.read_response(disconnect_on_error=False)
15301530

15311531
response = self._execute(conn, try_read)
15321532

redis/connection.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,11 @@ def send_packed_command(self, command, check_health=True):
832832
errno = e.args[0]
833833
errmsg = e.args[1]
834834
raise ConnectionError(f"Error {errno} while writing to socket. {errmsg}.")
835-
except Exception:
835+
except BaseException:
836+
# BaseExceptions can be raised when a socket send operation is not
837+
# finished, e.g. due to a timeout. Ideally, a caller could then re-try
838+
# to send un-sent data. However, the send_packed_command() API
839+
# does not support it so there is no point in keeping the connection open.
836840
self.disconnect()
837841
raise
838842

@@ -857,23 +861,31 @@ def can_read(self, timeout=0):
857861
self.disconnect()
858862
raise ConnectionError(f"Error while reading from {host_error}: {e.args}")
859863

860-
def read_response(self, disable_decoding=False):
864+
def read_response(
865+
self, disable_decoding=False, *, disconnect_on_error: bool = True
866+
):
861867
"""Read the response from a previously sent command"""
862868

863869
host_error = self._host_error()
864870

865871
try:
866872
response = self._parser.read_response(disable_decoding=disable_decoding)
867873
except socket.timeout:
868-
self.disconnect()
874+
if disconnect_on_error:
875+
self.disconnect()
869876
raise TimeoutError(f"Timeout reading from {host_error}")
870877
except OSError as e:
871-
self.disconnect()
878+
if disconnect_on_error:
879+
self.disconnect()
872880
raise ConnectionError(
873881
f"Error while reading from {host_error}" f" : {e.args}"
874882
)
875-
except Exception:
876-
self.disconnect()
883+
except BaseException:
884+
# Also by default close in case of BaseException. A lot of code
885+
# relies on this behaviour when doing Command/Response pairs.
886+
# See #1128.
887+
if disconnect_on_error:
888+
self.disconnect()
877889
raise
878890

879891
if self.health_check_interval:

tests/test_asyncio/test_commands.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""
22
Tests async overrides of commands from their mixins
33
"""
4+
import asyncio
45
import binascii
56
import datetime
67
import re
8+
import sys
79
from string import ascii_letters
810

911
import pytest
@@ -18,6 +20,11 @@
1820
skip_unless_arch_bits,
1921
)
2022

23+
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
24+
from asyncio import timeout as async_timeout
25+
else:
26+
from async_timeout import timeout as async_timeout
27+
2128
REDIS_6_VERSION = "5.9.0"
2229

2330

@@ -2999,6 +3006,37 @@ async def test_module_list(self, r: redis.Redis):
29993006
for x in await r.module_list():
30003007
assert isinstance(x, dict)
30013008

3009+
@pytest.mark.onlynoncluster
3010+
async def test_interrupted_command(self, r: redis.Redis):
3011+
"""
3012+
Regression test for issue #1128: An Un-handled BaseException
3013+
will leave the socket with un-read response to a previous
3014+
command.
3015+
"""
3016+
ready = asyncio.Event()
3017+
3018+
async def helper():
3019+
with pytest.raises(asyncio.CancelledError):
3020+
# blocking pop
3021+
ready.set()
3022+
await r.brpop(["nonexist"])
3023+
# If the following is not done, further Timout operations will fail,
3024+
# because the timeout won't catch its Cancelled Error if the task
3025+
# has a pending cancel. Python documentation probably should reflect this.
3026+
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
3027+
asyncio.current_task().uncancel()
3028+
# if all is well, we can continue. The following should not hang.
3029+
await r.set("status", "down")
3030+
3031+
task = asyncio.create_task(helper())
3032+
await ready.wait()
3033+
await asyncio.sleep(0.01)
3034+
# the task is now sleeping, lets send it an exception
3035+
task.cancel()
3036+
# If all is well, the task should finish right away, otherwise fail with Timeout
3037+
async with async_timeout(0.1):
3038+
await task
3039+
30023040

30033041
@pytest.mark.onlynoncluster
30043042
class TestBinarySave:

tests/test_commands.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import binascii
22
import datetime
33
import re
4+
import threading
45
import time
6+
from asyncio import CancelledError
57
from string import ascii_letters
68
from unittest import mock
9+
from unittest.mock import patch
710

811
import pytest
912

@@ -4726,6 +4729,38 @@ def test_psync(self, r):
47264729
res = r2.psync(r2.client_id(), 1)
47274730
assert b"FULLRESYNC" in res
47284731

4732+
@pytest.mark.onlynoncluster
4733+
def test_interrupted_command(self, r: redis.Redis):
4734+
"""
4735+
Regression test for issue #1128: An Un-handled BaseException
4736+
will leave the socket with un-read response to a previous
4737+
command.
4738+
"""
4739+
4740+
ok = False
4741+
4742+
def helper():
4743+
with pytest.raises(CancelledError):
4744+
# blocking pop
4745+
with patch.object(
4746+
r.connection._parser, "read_response", side_effect=CancelledError
4747+
):
4748+
r.brpop(["nonexist"])
4749+
# if all is well, we can continue.
4750+
r.set("status", "down") # should not hang
4751+
nonlocal ok
4752+
ok = True
4753+
4754+
thread = threading.Thread(target=helper)
4755+
thread.start()
4756+
thread.join(0.1)
4757+
try:
4758+
assert not thread.is_alive()
4759+
assert ok
4760+
finally:
4761+
# disconnect here so that fixture cleanup can proceed
4762+
r.connection.disconnect()
4763+
47294764

47304765
@pytest.mark.onlynoncluster
47314766
class TestBinarySave:

0 commit comments

Comments
 (0)