Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit 5cc28ea

Browse files
committed
Retry watch if request expires.
1 parent 474e9fb commit 5cc28ea

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

watch/watch.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def unmarshal_event(self, data, return_type):
9191
except ValueError:
9292
return data
9393
js['raw_object'] = js['object']
94-
if return_type:
94+
if return_type and js['type'] != 'ERROR':
9595
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
9696
js['object'] = self._api_client.deserialize(obj, return_type)
9797
if hasattr(js['object'], 'metadata'):
@@ -107,6 +107,14 @@ def unmarshal_event(self, data, return_type):
107107
def stream(self, func, *args, **kwargs):
108108
"""Watch an API resource and stream the result back via a generator.
109109
110+
Note that watching an API resource can expire. The method tries to
111+
resume automatically from the last result, but if that last result
112+
is too old as well, an `ApiException` exception will be thrown with
113+
``code`` 410. In that case you have to recover yourself, probably
114+
by listing the API resource to obtain the latest state and then
115+
watching from that state on by setting ``resource_version`` to
116+
one returned from listing.
117+
110118
:param func: The API function pointer. Any parameter to the function
111119
can be passed after this parameter.
112120
@@ -138,11 +146,26 @@ def stream(self, func, *args, **kwargs):
138146
self.resource_version = kwargs['resource_version']
139147

140148
timeouts = ('timeout_seconds' in kwargs)
149+
retry_after_410 = False
141150
while True:
142151
resp = func(*args, **kwargs)
143152
try:
144153
for line in iter_resp_lines(resp):
145-
yield self.unmarshal_event(line, return_type)
154+
event = self.unmarshal_event(line, return_type)
155+
if isinstance(event, dict) and event['type'] == 'ERROR':
156+
obj = event['raw_object']
157+
# Current request expired, let's retry,
158+
# but only if we have not already retried.
159+
if not retry_after_410 and obj['code'] == 410:
160+
retry_after_410 = True
161+
break
162+
else:
163+
reason = "%s: %s" % (obj['reason'], obj['message'])
164+
raise client.rest.ApiException(status=obj['code'],
165+
reason=reason)
166+
else:
167+
retry_after_410 = False
168+
yield event
146169
if self._stop:
147170
break
148171
finally:

watch/watch_test.py

+27
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
from mock import Mock, call
2020

21+
from kubernetes import client
22+
2123
from .watch import Watch
2224

2325

@@ -275,6 +277,31 @@ def test_watch_with_exception(self):
275277
fake_resp.close.assert_called_once()
276278
fake_resp.release_conn.assert_called_once()
277279

280+
def test_watch_with_error_event(self):
281+
fake_resp = Mock()
282+
fake_resp.close = Mock()
283+
fake_resp.release_conn = Mock()
284+
fake_resp.read_chunked = Mock(
285+
return_value=[
286+
'{"type": "ERROR", "object": {"code": 410, '
287+
'"reason": "Gone", "message": "error message"}}\n'])
288+
289+
fake_api = Mock()
290+
fake_api.get_thing = Mock(return_value=fake_resp)
291+
292+
w = Watch()
293+
try:
294+
for _ in w.stream(fake_api.get_thing):
295+
self.fail(self, "Should fail with ApiException.")
296+
except client.rest.ApiException:
297+
pass
298+
299+
fake_api.get_thing.assert_called_once_with(
300+
_preload_content=False, watch=True)
301+
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
302+
fake_resp.close.assert_called_once()
303+
fake_resp.release_conn.assert_called_once()
304+
278305

279306
if __name__ == '__main__':
280307
unittest.main()

0 commit comments

Comments
 (0)