Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Fixes kubernetes-client/python issue 1047 "ResponseNotChunked from watch" #231

Merged
merged 1 commit into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _find_return_type(func):

def iter_resp_lines(resp):
prev = ""
for seg in resp.read_chunked(decode_content=False):
for seg in resp.stream(amt=None, decode_content=False):
if isinstance(seg, bytes):
seg = seg.decode('utf8')
seg = prev + seg
Expand Down
44 changes: 25 additions & 19 deletions watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_watch_with_decode(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
Expand Down Expand Up @@ -63,15 +63,16 @@ def test_watch_with_decode(self):

fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_for_follow(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'log_line_1\n',
'log_line_2\n'])
Expand All @@ -92,7 +93,8 @@ def test_watch_for_follow(self):

fake_api.read_namespaced_pod_log.assert_called_once_with(
_preload_content=False, follow=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

Expand All @@ -112,6 +114,7 @@ def test_watch_resource_version_set(self):
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
]

# return nothing on the first call and values on the second
# this emulates a watch from a rv that returns nothing in the first k8s
# watch reset and values later
Expand All @@ -123,7 +126,7 @@ def get_values(*args, **kwargs):
else:
return values

fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
side_effect=get_values)

fake_api = Mock()
Expand Down Expand Up @@ -170,7 +173,7 @@ def test_watch_stream_twice(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=['{"type": "ADDED", "object": 1}\n'] * 4)

fake_api = Mock()
Expand All @@ -186,8 +189,8 @@ def test_watch_stream_twice(self):
self.assertEqual(count, 3)
fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(
decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

Expand All @@ -197,7 +200,7 @@ def test_watch_stream_loop(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=['{"type": "ADDED", "object": 1}\n'])

fake_api = Mock()
Expand All @@ -219,7 +222,7 @@ def test_watch_stream_loop(self):

self.assertEqual(count, 2)
self.assertEqual(fake_api.get_namespaces.call_count, 2)
self.assertEqual(fake_resp.read_chunked.call_count, 2)
self.assertEqual(fake_resp.stream.call_count, 2)
self.assertEqual(fake_resp.close.call_count, 2)
self.assertEqual(fake_resp.release_conn.call_count, 2)

Expand Down Expand Up @@ -256,7 +259,7 @@ def test_watch_with_exception(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(side_effect=KeyError('expected'))
fake_resp.stream = Mock(side_effect=KeyError('expected'))

fake_api = Mock()
fake_api.get_thing = Mock(return_value=fake_resp)
Expand All @@ -271,15 +274,16 @@ def test_watch_with_exception(self):

fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_with_error_event(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
Expand All @@ -294,15 +298,16 @@ def test_watch_with_error_event(self):

fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_retries_on_error_event(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
Expand All @@ -320,16 +325,16 @@ def test_watch_retries_on_error_event(self):
# Two calls should be expected during a retry
fake_api.get_thing.assert_has_calls(
[call(resource_version=0, _preload_content=False, watch=True)] * 2)
fake_resp.read_chunked.assert_has_calls(
[call(decode_content=False)] * 2)
fake_resp.stream.assert_has_calls(
[call(amt=None, decode_content=False)] * 2)
assert fake_resp.close.call_count == 2
assert fake_resp.release_conn.call_count == 2

def test_watch_with_error_event_and_timeout_param(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
Expand All @@ -346,7 +351,8 @@ def test_watch_with_error_event_and_timeout_param(self):

fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True, timeout_seconds=10)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

Expand Down