From 90399663f378b33227f723d3f0c1677965b6d96b Mon Sep 17 00:00:00 2001 From: Darren Hague Date: Thu, 8 Apr 2021 13:49:46 +0100 Subject: [PATCH] Fixes kubernetes-client/python issue 1047 "ResponseNotChunked from watch" In recent versions of K8S (>1.16?), when a `Watch.stream()` call uses a resource_version which is too old the resulting 410 error is wrapped in JSON and returned in a non-chunked 200 response. Using `resp.stream()` instead of `resp.read_chunked()` automatically handles the response being either chunked or non-chunked. --- watch/watch.py | 2 +- watch/watch_test.py | 44 +++++++++++++++++++++++++------------------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/watch/watch.py b/watch/watch.py index b432778..3bbb770 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -53,7 +53,7 @@ def _find_return_type(func): def iter_resp_lines(resp): prev = "" - for seg in resp.read_chunked(decode_content=False): + for seg in resp.stream(amt=None, decode_content=False): if isinstance(seg, bytes): seg = seg.decode('utf8') seg = prev + seg diff --git a/watch/watch_test.py b/watch/watch_test.py index 32cf633..cad72fd 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -30,7 +30,7 @@ def test_watch_with_decode(self): fake_resp = Mock() fake_resp.close = Mock() fake_resp.release_conn = Mock() - fake_resp.read_chunked = Mock( + fake_resp.stream = Mock( return_value=[ '{"type": "ADDED", "object": {"metadata": {"name": "test1",' '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', @@ -63,7 +63,8 @@ def test_watch_with_decode(self): fake_api.get_namespaces.assert_called_once_with( _preload_content=False, watch=True) - fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.stream.assert_called_once_with( + amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() @@ -71,7 +72,7 @@ def test_watch_for_follow(self): fake_resp = Mock() fake_resp.close = Mock() fake_resp.release_conn = Mock() - fake_resp.read_chunked = Mock( + fake_resp.stream = Mock( return_value=[ 'log_line_1\n', 'log_line_2\n']) @@ -92,7 +93,8 @@ def test_watch_for_follow(self): fake_api.read_namespaced_pod_log.assert_called_once_with( _preload_content=False, follow=True) - fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.stream.assert_called_once_with( + amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() @@ -112,6 +114,7 @@ def test_watch_resource_version_set(self): '{"type": "ADDED", "object": {"metadata": {"name": "test3",' '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n' ] + # return nothing on the first call and values on the second # this emulates a watch from a rv that returns nothing in the first k8s # watch reset and values later @@ -123,7 +126,7 @@ def get_values(*args, **kwargs): else: return values - fake_resp.read_chunked = Mock( + fake_resp.stream = Mock( side_effect=get_values) fake_api = Mock() @@ -170,7 +173,7 @@ def test_watch_stream_twice(self): fake_resp = Mock() fake_resp.close = Mock() fake_resp.release_conn = Mock() - fake_resp.read_chunked = Mock( + fake_resp.stream = Mock( return_value=['{"type": "ADDED", "object": 1}\n'] * 4) fake_api = Mock() @@ -186,8 +189,8 @@ def test_watch_stream_twice(self): self.assertEqual(count, 3) fake_api.get_namespaces.assert_called_once_with( _preload_content=False, watch=True) - fake_resp.read_chunked.assert_called_once_with( - decode_content=False) + fake_resp.stream.assert_called_once_with( + amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() @@ -197,7 +200,7 @@ def test_watch_stream_loop(self): fake_resp = Mock() fake_resp.close = Mock() fake_resp.release_conn = Mock() - fake_resp.read_chunked = Mock( + fake_resp.stream = Mock( return_value=['{"type": "ADDED", "object": 1}\n']) fake_api = Mock() @@ -219,7 +222,7 @@ def test_watch_stream_loop(self): self.assertEqual(count, 2) self.assertEqual(fake_api.get_namespaces.call_count, 2) - self.assertEqual(fake_resp.read_chunked.call_count, 2) + self.assertEqual(fake_resp.stream.call_count, 2) self.assertEqual(fake_resp.close.call_count, 2) self.assertEqual(fake_resp.release_conn.call_count, 2) @@ -256,7 +259,7 @@ def test_watch_with_exception(self): fake_resp = Mock() fake_resp.close = Mock() fake_resp.release_conn = Mock() - fake_resp.read_chunked = Mock(side_effect=KeyError('expected')) + fake_resp.stream = Mock(side_effect=KeyError('expected')) fake_api = Mock() fake_api.get_thing = Mock(return_value=fake_resp) @@ -271,7 +274,8 @@ def test_watch_with_exception(self): fake_api.get_thing.assert_called_once_with( _preload_content=False, watch=True) - fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.stream.assert_called_once_with( + amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() @@ -279,7 +283,7 @@ def test_watch_with_error_event(self): fake_resp = Mock() fake_resp.close = Mock() fake_resp.release_conn = Mock() - fake_resp.read_chunked = Mock( + fake_resp.stream = Mock( return_value=[ '{"type": "ERROR", "object": {"code": 410, ' '"reason": "Gone", "message": "error message"}}\n']) @@ -294,7 +298,8 @@ def test_watch_with_error_event(self): fake_api.get_thing.assert_called_once_with( _preload_content=False, watch=True) - fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.stream.assert_called_once_with( + amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() @@ -302,7 +307,7 @@ def test_watch_retries_on_error_event(self): fake_resp = Mock() fake_resp.close = Mock() fake_resp.release_conn = Mock() - fake_resp.read_chunked = Mock( + fake_resp.stream = Mock( return_value=[ '{"type": "ERROR", "object": {"code": 410, ' '"reason": "Gone", "message": "error message"}}\n']) @@ -320,8 +325,8 @@ def test_watch_retries_on_error_event(self): # Two calls should be expected during a retry fake_api.get_thing.assert_has_calls( [call(resource_version=0, _preload_content=False, watch=True)] * 2) - fake_resp.read_chunked.assert_has_calls( - [call(decode_content=False)] * 2) + fake_resp.stream.assert_has_calls( + [call(amt=None, decode_content=False)] * 2) assert fake_resp.close.call_count == 2 assert fake_resp.release_conn.call_count == 2 @@ -329,7 +334,7 @@ def test_watch_with_error_event_and_timeout_param(self): fake_resp = Mock() fake_resp.close = Mock() fake_resp.release_conn = Mock() - fake_resp.read_chunked = Mock( + fake_resp.stream = Mock( return_value=[ '{"type": "ERROR", "object": {"code": 410, ' '"reason": "Gone", "message": "error message"}}\n']) @@ -346,7 +351,8 @@ def test_watch_with_error_event_and_timeout_param(self): fake_api.get_thing.assert_called_once_with( _preload_content=False, watch=True, timeout_seconds=10) - fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.stream.assert_called_once_with( + amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once()