Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit 6f02e73

Browse files
authored
Merge pull request #36 from lichen2013/keep_watch
Keep the watch action working forever
2 parents 13aa7cd + aec1c52 commit 6f02e73

File tree

2 files changed

+51
-9
lines changed

2 files changed

+51
-9
lines changed

watch/watch.py

+19-9
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def __init__(self, return_type=None):
6363
self._raw_return_type = return_type
6464
self._stop = False
6565
self._api_client = client.ApiClient()
66+
self.resource_version = 0
6667

6768
def stop(self):
6869
self._stop = True
@@ -81,6 +82,8 @@ def unmarshal_event(self, data, return_type):
8182
if return_type:
8283
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
8384
js['object'] = self._api_client.deserialize(obj, return_type)
85+
if hasattr(js['object'], 'metadata'):
86+
self.resource_version = js['object'].metadata.resource_version
8487
return js
8588

8689
def stream(self, func, *args, **kwargs):
@@ -113,12 +116,19 @@ def stream(self, func, *args, **kwargs):
113116
return_type = self.get_return_type(func)
114117
kwargs['watch'] = True
115118
kwargs['_preload_content'] = False
116-
resp = func(*args, **kwargs)
117-
try:
118-
for line in iter_resp_lines(resp):
119-
yield self.unmarshal_event(line, return_type)
120-
if self._stop:
121-
break
122-
finally:
123-
resp.close()
124-
resp.release_conn()
119+
120+
timeouts = ('timeout_seconds' in kwargs)
121+
while True:
122+
resp = func(*args, **kwargs)
123+
try:
124+
for line in iter_resp_lines(resp):
125+
yield self.unmarshal_event(line, return_type)
126+
if self._stop:
127+
break
128+
finally:
129+
kwargs['resource_version'] = self.resource_version
130+
resp.close()
131+
resp.release_conn()
132+
133+
if timeouts or self._stop:
134+
break

watch/watch_test.py

+32
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,38 @@ def test_watch_stream_twice(self):
8585
fake_resp.close.assert_called_once()
8686
fake_resp.release_conn.assert_called_once()
8787

88+
def test_watch_stream_loop(self):
89+
w = Watch(float)
90+
91+
fake_resp = Mock()
92+
fake_resp.close = Mock()
93+
fake_resp.release_conn = Mock()
94+
fake_resp.read_chunked = Mock(
95+
return_value=['{"type": "ADDED", "object": 1}\n'])
96+
97+
fake_api = Mock()
98+
fake_api.get_namespaces = Mock(return_value=fake_resp)
99+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
100+
101+
count = 0
102+
103+
# when timeout_seconds is set, auto-exist when timeout reaches
104+
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
105+
count = count + 1
106+
self.assertEqual(count, 1)
107+
108+
# when no timeout_seconds, only exist when w.stop() is called
109+
for e in w.stream(fake_api.get_namespaces):
110+
count = count + 1
111+
if count == 2:
112+
w.stop()
113+
114+
self.assertEqual(count, 2)
115+
self.assertEqual(fake_api.get_namespaces.call_count, 2)
116+
self.assertEqual(fake_resp.read_chunked.call_count, 2)
117+
self.assertEqual(fake_resp.close.call_count, 2)
118+
self.assertEqual(fake_resp.release_conn.call_count, 2)
119+
88120
def test_unmarshal_with_float_object(self):
89121
w = Watch()
90122
event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float')

0 commit comments

Comments
 (0)