diff --git a/watch/watch.py b/watch/watch.py index 5966eace..8d58b4eb 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -95,13 +95,22 @@ def unmarshal_event(self, data, return_type): obj = SimpleNamespace(data=json.dumps(js['raw_object'])) js['object'] = self._api_client.deserialize(obj, return_type) if hasattr(js['object'], 'metadata'): - self.resource_version = js['object'].metadata.resource_version + resource_version = js['object'].metadata.resource_version # For custom objects that we don't have model defined, json # deserialization results in dictionary elif (isinstance(js['object'], dict) and 'metadata' in js['object'] and 'resourceVersion' in js['object']['metadata']): - self.resource_version = js['object']['metadata'][ - 'resourceVersion'] + resource_version = js['object']['metadata']['resourceVersion'] + else: + resource_version = None + + # Resource version must never revert to old objects, especially + # on the first listing call (the objects are sorted randomly). + if (resource_version is not None and + (self.resource_version is None or + self.resource_version < resource_version)): + self.resource_version = resource_version + return js def stream(self, func, *args, **kwargs): diff --git a/watch/watch_test.py b/watch/watch_test.py index ebc400af..823bfc66 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -152,7 +152,7 @@ def get_values(*args, **kwargs): # opaque value we cannot interpret it and order it so rely # on k8s returning the events completely and in order calls.append(call(_preload_content=False, watch=True, - resource_version="3")) + resource_version="5")) for c, e in enumerate(w.stream(fake_api.get_namespaces, resource_version="5")): @@ -164,6 +164,43 @@ def get_values(*args, **kwargs): # more strict test with worse error message self.assertEqual(fake_api.get_namespaces.mock_calls, calls) + def test_watch_resource_version_grows_monotonically(self): + # https://github.com/kubernetes-client/python/issues/700 + # ensure that the resource version never decrements, + # especially on the first listing of the resources + # (as they go in arbitrary order, not sorted). + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.read_chunked = Mock( + return_value=[ + '{"type": "ADDED", "object": {"metadata": {"name": "test1",' + '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + '{"type": "ADDED", "object": {"metadata": {"name": "test2",' + '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n' + '{"type": "ADDED", "object": {"metadata": {"name": "test3",' + '"resourceVersion": "2"}, "spec": {}, "status": {}}}\n', + 'should_not_happened\n']) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + watch_resource_versions = [] + event_resource_versions = [] + count = 1 + for e in w.stream(fake_api.get_namespaces): + obj = e['object'] + watch_resource_versions.append(w.resource_version) + event_resource_versions.append(obj.metadata.resource_version) + count += 1 + if count == 4: + w.stop() + + self.assertEqual(watch_resource_versions, ['1', '3', '3']) + self.assertEqual(event_resource_versions, ['1', '3', '2']) + def test_watch_stream_twice(self): w = Watch(float) for step in ['first', 'second']: