From 3c30a3099336a5976074c18ea61814646689b4a8 Mon Sep 17 00:00:00 2001 From: Julian Taylor Date: Sat, 19 Jan 2019 12:38:57 +0100 Subject: [PATCH] fix watching with a specified resource version The watch code reset the version to the last found in the response. When you first list existing objects and then start watching from that resource version the existing versions are older than the version you wanted and the watch starts from the wrong version after the first restart. This leads to for example already deleted objects ending in the stream again. Fix this by setting the minimum resource version to reset from to the input resource version. As long as k8s returns all objects in order in the watch this should work. We cannot use the integer value of the resource version to order it as one should be treat the value as opaque. Closes https://github.com/kubernetes-client/python/issues/700 --- watch/watch.py | 2 ++ watch/watch_test.py | 73 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/watch/watch.py b/watch/watch.py index 21899dd8..a9c315cd 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -122,6 +122,8 @@ def stream(self, func, *args, **kwargs): return_type = self.get_return_type(func) kwargs['watch'] = True kwargs['_preload_content'] = False + if 'resource_version' in kwargs: + self.resource_version = kwargs['resource_version'] timeouts = ('timeout_seconds' in kwargs) while True: diff --git a/watch/watch_test.py b/watch/watch_test.py index d1ec80a1..672c0526 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -14,12 +14,15 @@ import unittest -from mock import Mock +from mock import Mock, call from .watch import Watch class WatchTests(unittest.TestCase): + def setUp(self): + # counter for a test that needs test global state + self.callcount = 0 def test_watch_with_decode(self): fake_resp = Mock() @@ -62,6 +65,74 @@ def test_watch_with_decode(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_resource_version_set(self): + # https://github.com/kubernetes-client/python/issues/700 + # ensure watching from a resource version does reset to resource + # version 0 after k8s resets the watch connection + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + values = [ + '{"type": "ADDED", "object": {"metadata": {"name": "test1",' + '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + '{"type": "ADDED", "object": {"metadata": {"name": "test2",' + '"resourceVersion": "2"}, "spec": {}, "sta', + 'tus": {}}}\n' + '{"type": "ADDED", "object": {"metadata": {"name": "test3",' + '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n' + ] + # return nothing on the first call and values on the second + # this emulates a watch from a rv that returns nothing in the first k8s + # watch reset and values later + + def get_values(*args, **kwargs): + self.callcount += 1 + if self.callcount == 1: + return [] + else: + return values + + fake_resp.read_chunked = Mock( + side_effect=get_values) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + # ensure we keep our requested resource version or the version latest + # returned version when the existing versions are older than the + # requested version + # needed for the list existing objects, then watch from there use case + calls = [] + + iterations = 2 + # first two calls must use the passed rv, the first call is a + # "reset" and does not actually return anything + # the second call must use the same rv but will return values + # (with a wrong rv but a real cluster would behave correctly) + # calls following that will use the rv from those returned values + calls.append(call(_preload_content=False, watch=True, + resource_version="5")) + calls.append(call(_preload_content=False, watch=True, + resource_version="5")) + for i in range(iterations): + # ideally we want 5 here but as rv must be treated as an + # opaque value we cannot interpret it and order it so rely + # on k8s returning the events completely and in order + calls.append(call(_preload_content=False, watch=True, + resource_version="3")) + + for c, e in enumerate(w.stream(fake_api.get_namespaces, + resource_version="5")): + if c == len(values) * iterations: + w.stop() + + # check calls are in the list, gives good error output + fake_api.get_namespaces.assert_has_calls(calls) + # more strict test with worse error message + self.assertEqual(fake_api.get_namespaces.mock_calls, calls) + def test_watch_stream_twice(self): w = Watch(float) for step in ['first', 'second']: