Skip to content

Commit 7222c98

Browse files
authored
[feat] Watch() retries 410 errors (#327)
1 parent a37f664 commit 7222c98

File tree

2 files changed

+119
-6
lines changed

2 files changed

+119
-6
lines changed

kubernetes_asyncio/watch/watch.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ def _reconnect(self):
145145

146146
async def next(self):
147147

148+
watch_forever = 'timeout_seconds' not in self.func.keywords
149+
retry_410 = watch_forever
150+
148151
while 1:
149152

150153
# Set the response object to the user supplied function (eg
@@ -163,9 +166,7 @@ async def next(self):
163166
except asyncio.TimeoutError:
164167
# This exception can be raised by aiohttp (client timeout)
165168
# but we don't retry if server side timeout is applied.
166-
# The base scenario would be to restart watching with timeout_seconds
167-
# reduced by time spent in previous iterations.
168-
if 'timeout_seconds' not in self.func.keywords:
169+
if watch_forever:
169170
self._reconnect()
170171
continue
171172
else:
@@ -176,15 +177,25 @@ async def next(self):
176177
# Stop the iterator if K8s sends an empty response. This happens when
177178
# eg the supplied timeout has expired.
178179
if line == '':
179-
if 'timeout_seconds' not in self.func.keywords:
180+
if watch_forever:
180181
self._reconnect()
181182
continue
182183

183184
# Special case for faster log streaming
184185
if self.return_type == 'str':
185186
return line
186187

187-
return self.unmarshal_event(line, self.return_type)
188+
# retry 410 error only once
189+
try:
190+
event = self.unmarshal_event(line, self.return_type)
191+
except client.exceptions.ApiException as ex:
192+
if ex.status == 410 and retry_410:
193+
retry_410 = False # retry only once
194+
self._reconnect()
195+
continue
196+
raise
197+
retry_410 = watch_forever
198+
return event
188199

189200
def stream(self, func, *args, **kwargs):
190201
"""Watch an API resource and stream the result back via a generator.

kubernetes_asyncio/watch/watch_test.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ async def test_watch_with_exception(self):
227227
async for e in watch.stream(fake_api.get_namespaces, timeout_seconds=10): # noqa
228228
pass
229229

230-
async def test_watch_timeout(self):
230+
async def test_watch_retry_timeout(self):
231231
fake_resp = AsyncMock()
232232
fake_resp.content.readline = AsyncMock()
233233
fake_resp.release = Mock()
@@ -256,6 +256,108 @@ async def test_watch_timeout(self):
256256
call(_preload_content=False, watch=True, resource_version='1555')])
257257
fake_resp.release.assert_called_once_with()
258258

259+
async def test_watch_retry_410(self):
260+
fake_resp = AsyncMock()
261+
fake_resp.content.readline = AsyncMock()
262+
fake_resp.release = Mock()
263+
264+
mock_event1 = {
265+
"type": "ADDED",
266+
"object": {
267+
"metadata":
268+
{
269+
"name": "test1555",
270+
"resourceVersion": "1555"
271+
},
272+
"spec": {},
273+
"status": {}
274+
}
275+
}
276+
277+
mock_event2 = {
278+
"type": "ADDED",
279+
"object": {
280+
"metadata":
281+
{
282+
"name": "test1555",
283+
"resourceVersion": "1555"
284+
},
285+
"spec": {},
286+
"status": {}
287+
}
288+
}
289+
290+
mock_410 = {
291+
'type': 'ERROR',
292+
'object': {
293+
'kind': 'Status',
294+
'apiVersion': 'v1',
295+
'metadata': {},
296+
'status': 'Failure',
297+
'message': 'too old resource version: 1 (8146471)',
298+
'reason': 'Gone',
299+
'code': 410
300+
}
301+
}
302+
303+
# retry 410
304+
fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'),
305+
json.dumps(mock_410).encode('utf8'),
306+
json.dumps(mock_event2).encode('utf8'),
307+
json.dumps(mock_410).encode('utf8'),
308+
b""]
309+
310+
fake_api = Mock()
311+
fake_api.get_namespaces = AsyncMock(return_value=fake_resp)
312+
fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList'
313+
314+
watch = kubernetes_asyncio.watch.Watch()
315+
async with watch.stream(fake_api.get_namespaces) as stream:
316+
async for e in stream: # noqa
317+
pass
318+
319+
fake_api.get_namespaces.assert_has_calls(
320+
[call(_preload_content=False, watch=True),
321+
call(_preload_content=False, watch=True, resource_version='1555')])
322+
fake_resp.release.assert_called_once_with()
323+
324+
# retry 410 only once
325+
fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'),
326+
json.dumps(mock_410).encode('utf8'),
327+
json.dumps(mock_event2).encode('utf8'),
328+
json.dumps(mock_410).encode('utf8'),
329+
json.dumps(mock_410).encode('utf8'),
330+
b""]
331+
332+
fake_api = Mock()
333+
fake_api.get_namespaces = AsyncMock(return_value=fake_resp)
334+
fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList'
335+
336+
with self.assertRaisesRegex(
337+
kubernetes_asyncio.client.exceptions.ApiException,
338+
r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'):
339+
watch = kubernetes_asyncio.watch.Watch()
340+
async with watch.stream(fake_api.get_namespaces) as stream:
341+
async for e in stream: # noqa
342+
pass
343+
344+
# no retry 410 if timeout is passed
345+
fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'),
346+
json.dumps(mock_410).encode('utf8'),
347+
b""]
348+
349+
fake_api = Mock()
350+
fake_api.get_namespaces = AsyncMock(return_value=fake_resp)
351+
fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList'
352+
353+
with self.assertRaisesRegex(
354+
kubernetes_asyncio.client.exceptions.ApiException,
355+
r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'):
356+
watch = kubernetes_asyncio.watch.Watch()
357+
async with watch.stream(fake_api.get_namespaces, timeout_seconds=10) as stream:
358+
async for e in stream: # noqa
359+
pass
360+
259361
async def test_watch_timeout_with_resource_version(self):
260362
fake_resp = AsyncMock()
261363
fake_resp.content.readline = AsyncMock()

0 commit comments

Comments
 (0)