Skip to content

Commit 4ac32bb

Browse files
authored
Fix reconnecting in watch for custom resources (#321)
* Fix reconnecting in watch for custom resources * add test for watch crd without timeout * clean up tests
1 parent b7ec53c commit 4ac32bb

File tree

3 files changed

+103
-154
lines changed

3 files changed

+103
-154
lines changed

kubernetes_asyncio/dynamic/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ async def watch(resource, namespace=None, name=None, label_selector=None, field_
227227
serialize=False,
228228
**kwargs
229229
):
230+
if event == "":
231+
break
230232
event['object'] = ResourceInstance(resource, event['object'])
231233
yield event
232234

kubernetes_asyncio/dynamic/client_test.py

Lines changed: 84 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -42,32 +42,46 @@ async def test_cluster_custom_resources(self):
4242
await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe')
4343

4444
crd_api = await client.resources.get(
45-
api_version='apiextensions.k8s.io/v1beta1',
45+
api_version='apiextensions.k8s.io/v1',
4646
kind='CustomResourceDefinition')
4747
name = 'clusterchangemes.apps.example.com'
4848
crd_manifest = {
49-
'apiVersion': 'apiextensions.k8s.io/v1beta1',
50-
'kind': 'CustomResourceDefinition',
51-
'metadata': {
52-
'name': name,
49+
"apiVersion": "apiextensions.k8s.io/v1",
50+
"kind": "CustomResourceDefinition",
51+
"metadata": {
52+
"name": name,
5353
},
54-
'spec': {
55-
'group': 'apps.example.com',
56-
'names': {
57-
'kind': 'ClusterChangeMe',
58-
'listKind': 'ClusterChangeMeList',
59-
'plural': 'clusterchangemes',
60-
'singular': 'clusterchangeme',
54+
"spec": {
55+
"group": "apps.example.com",
56+
"names": {
57+
"kind": "ClusterChangeMe",
58+
"listKind": "ClusterChangeMeList",
59+
"plural": "clusterchangemes",
60+
"singular": "clusterchangeme",
6161
},
62-
'scope': 'Cluster',
63-
'version': 'v1',
64-
'subresources': {
65-
'status': {}
66-
}
67-
}
62+
"scope": "Cluster",
63+
"versions": [
64+
{
65+
"name": "v1",
66+
"served": True,
67+
"storage": True,
68+
"schema": {
69+
"openAPIV3Schema": {
70+
"type": "object",
71+
"properties": {
72+
"spec": {
73+
"type": "object",
74+
"properties": {"size": {"type": "integer"}},
75+
}
76+
},
77+
}
78+
},
79+
}
80+
],
81+
},
6882
}
69-
resp = await crd_api.create(crd_manifest)
7083

84+
resp = await crd_api.create(crd_manifest)
7185
self.assertEqual(name, resp.metadata.name)
7286
self.assertTrue(resp.status)
7387

@@ -99,6 +113,19 @@ async def test_cluster_custom_resources(self):
99113
resp = await changeme_api.create(body=changeme_manifest)
100114
self.assertEqual(resp.metadata.name, changeme_name)
101115

116+
# watch with timeout
117+
count = 0
118+
async for _ in client.watch(changeme_api, timeout=3, namespace="default", name=changeme_name):
119+
count += 1
120+
self.assertTrue(count > 0, msg="no events received for watch")
121+
122+
# without timeout, should be longer than the previous check
123+
async def _watch_no_timeout():
124+
async for _ in client.watch(changeme_api, namespace="default", name=changeme_name):
125+
pass
126+
with self.assertRaises(asyncio.exceptions.TimeoutError):
127+
await asyncio.wait_for(_watch_no_timeout(), timeout=5)
128+
102129
resp = await changeme_api.get(name=changeme_name)
103130
self.assertEqual(resp.metadata.name, changeme_name)
104131

@@ -127,111 +154,6 @@ async def test_cluster_custom_resources(self):
127154
with self.assertRaises(ResourceNotFoundError):
128155
await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe')
129156

130-
# async def test_async_namespaced_custom_resources(self):
131-
# async with api_client.ApiClient(configuration=self.config) as apic:
132-
# client = await DynamicClient.newclient(apic)
133-
#
134-
# with self.assertRaises(ResourceNotFoundError):
135-
# await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe')
136-
#
137-
# crd_api = await client.resources.get(
138-
# api_version='apiextensions.k8s.io/v1beta1',
139-
# kind='CustomResourceDefinition')
140-
#
141-
# name = 'changemes.apps.example.com'
142-
#
143-
# crd_manifest = {
144-
# 'apiVersion': 'apiextensions.k8s.io/v1beta1',
145-
# 'kind': 'CustomResourceDefinition',
146-
# 'metadata': {
147-
# 'name': name,
148-
# },
149-
# 'spec': {
150-
# 'group': 'apps.example.com',
151-
# 'names': {
152-
# 'kind': 'ChangeMe',
153-
# 'listKind': 'ChangeMeList',
154-
# 'plural': 'changemes',
155-
# 'singular': 'changeme',
156-
# },
157-
# 'scope': 'Namespaced',
158-
# 'version': 'v1',
159-
# 'subresources': {
160-
# 'status': {}
161-
# }
162-
# }
163-
# }
164-
# async_resp = await crd_api.create(crd_manifest, async_req=True)
165-
#
166-
# self.assertEqual(name, async_resp.metadata.name)
167-
# self.assertTrue(async_resp.status)
168-
#
169-
# async_resp = await crd_api.get(name=name, async_req=True)
170-
# self.assertEqual(name, async_resp.metadata.name)
171-
# self.assertTrue(async_resp.status)
172-
#
173-
# try:
174-
# changeme_api = await client.resources.get(
175-
# api_version='apps.example.com/v1', kind='ChangeMe')
176-
# except ResourceNotFoundError:
177-
# # Need to wait a sec for the discovery layer to get updated
178-
# await asyncio.sleep(2)
179-
# changeme_api = await client.resources.get(
180-
# api_version='apps.example.com/v1', kind='ChangeMe')
181-
#
182-
# async_resp = await changeme_api.get(async_req=True)
183-
# self.assertEqual(async_resp.items, [])
184-
#
185-
# changeme_name = 'custom-resource' + short_uuid()
186-
# changeme_manifest = {
187-
# 'apiVersion': 'apps.example.com/v1',
188-
# 'kind': 'ChangeMe',
189-
# 'metadata': {
190-
# 'name': changeme_name,
191-
# },
192-
# 'spec': {}
193-
# }
194-
#
195-
# async_resp = await changeme_api.create(body=changeme_manifest, namespace='default', async_req=True)
196-
# self.assertEqual(async_resp.metadata.name, changeme_name)
197-
#
198-
# async_resp = await changeme_api.get(name=changeme_name, namespace='default', async_req=True)
199-
# self.assertEqual(async_resp.metadata.name, changeme_name)
200-
#
201-
# changeme_manifest['spec']['size'] = 3
202-
# async_resp = await changeme_api.patch(
203-
# body=changeme_manifest,
204-
# namespace='default',
205-
# content_type='application/merge-patch+json',
206-
# async_req=True
207-
# )
208-
# self.assertEqual(async_resp.spec.size, 3)
209-
#
210-
# async_resp = await changeme_api.get(name=changeme_name, namespace='default', async_req=True)
211-
# self.assertEqual(async_resp.spec.size, 3)
212-
#
213-
# async_resp = await changeme_api.get(namespace='default', async_req=True)
214-
# self.assertEqual(len(async_resp.items), 1)
215-
#
216-
# async_resp = await changeme_api.get(async_req=True)
217-
# self.assertEqual(len(async_resp.items), 1)
218-
#
219-
# await changeme_api.delete(name=changeme_name, namespace='default', async_req=True)
220-
#
221-
# async_resp = await changeme_api.get(namespace='default', async_req=True)
222-
# self.assertEqual(len(async_resp.items), 0)
223-
#
224-
# async_resp = await changeme_api.get(async_req=True)
225-
# self.assertEqual(len(async_resp.items), 0)
226-
#
227-
# await crd_api.delete(name=name, async_req=True)
228-
#
229-
# await asyncio.sleep(2)
230-
# await client.resources.invalidate_cache()
231-
# with self.assertRaises(ResourceNotFoundError):
232-
# await client.resources.get(
233-
# api_version='apps.example.com/v1', kind='ChangeMe')
234-
235157
async def test_namespaced_custom_resources(self):
236158
async with api_client.ApiClient(configuration=self.config) as apic:
237159
client = await DynamicClient(apic)
@@ -240,32 +162,46 @@ async def test_namespaced_custom_resources(self):
240162
await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe')
241163

242164
crd_api = await client.resources.get(
243-
api_version='apiextensions.k8s.io/v1beta1',
165+
api_version='apiextensions.k8s.io/v1',
244166
kind='CustomResourceDefinition')
245-
name = 'changemes.apps.example.com'
167+
name = 'clusterchangemes.apps.example.com'
246168
crd_manifest = {
247-
'apiVersion': 'apiextensions.k8s.io/v1beta1',
248-
'kind': 'CustomResourceDefinition',
249-
'metadata': {
250-
'name': name,
169+
"apiVersion": "apiextensions.k8s.io/v1",
170+
"kind": "CustomResourceDefinition",
171+
"metadata": {
172+
"name": name,
251173
},
252-
'spec': {
253-
'group': 'apps.example.com',
254-
'names': {
255-
'kind': 'ChangeMe',
256-
'listKind': 'ChangeMeList',
257-
'plural': 'changemes',
258-
'singular': 'changeme',
174+
"spec": {
175+
"group": "apps.example.com",
176+
"names": {
177+
"kind": "ClusterChangeMe",
178+
"listKind": "ClusterChangeMeList",
179+
"plural": "clusterchangemes",
180+
"singular": "clusterchangeme",
259181
},
260-
'scope': 'Namespaced',
261-
'version': 'v1',
262-
'subresources': {
263-
'status': {}
264-
}
265-
}
182+
"scope": "Namespaced",
183+
"versions": [
184+
{
185+
"name": "v1",
186+
"served": True,
187+
"storage": True,
188+
"schema": {
189+
"openAPIV3Schema": {
190+
"type": "object",
191+
"properties": {
192+
"spec": {
193+
"type": "object",
194+
"properties": {"size": {"type": "integer"}},
195+
}
196+
},
197+
}
198+
},
199+
}
200+
],
201+
},
266202
}
267-
resp = await crd_api.create(crd_manifest)
268203

204+
resp = await crd_api.create(crd_manifest)
269205
self.assertEqual(name, resp.metadata.name)
270206
self.assertTrue(resp.status)
271207

@@ -276,18 +212,18 @@ async def test_namespaced_custom_resources(self):
276212
self.assertTrue(resp.status)
277213

278214
try:
279-
await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe')
215+
await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe')
280216
except ResourceNotFoundError:
281217
# Need to wait a sec for the discovery layer to get updated
282218
await asyncio.sleep(2)
283219
changeme_api = await client.resources.get(
284-
api_version='apps.example.com/v1', kind='ChangeMe')
220+
api_version='apps.example.com/v1', kind='ClusterChangeMe')
285221
resp = await changeme_api.get()
286222
self.assertEqual(resp.items, [])
287223
changeme_name = 'custom-resource' + short_uuid()
288224
changeme_manifest = {
289225
'apiVersion': 'apps.example.com/v1',
290-
'kind': 'ChangeMe',
226+
'kind': 'ClusterChangeMe',
291227
'metadata': {
292228
'name': changeme_name,
293229
},
@@ -446,7 +382,7 @@ async def test_configmap_apis(self):
446382
self.assertEqual(name, resp.metadata.name)
447383

448384
count = 0
449-
async for _ in client.watch(api, timeout=10, namespace="default", name=name):
385+
async for _ in client.watch(api, timeout=3, namespace="default", name=name):
450386
count += 1
451387
self.assertTrue(count > 0, msg="no events received for watch")
452388

kubernetes_asyncio/watch/watch.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,13 @@ def unmarshal_event(self, data: str, response_type):
104104

105105
# If possible, compile the JSON response into a Python native response
106106
# type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ...
107-
if response_type and js['type'].lower() != 'bookmark':
107+
if response_type:
108108
js['object'] = self._api_client.deserialize(
109109
response=SimpleNamespace(data=json.dumps(js['raw_object'])),
110110
response_type=response_type
111111
)
112112

113+
if js['type'].lower() != 'bookmark':
113114
# decode and save resource_version to continue watching
114115
if hasattr(js['object'], 'metadata'):
115116
self.resource_version = js['object'].metadata.resource_version
@@ -120,6 +121,7 @@ def unmarshal_event(self, data: str, response_type):
120121
and 'metadata' in js['object']
121122
and 'resourceVersion' in js['object']['metadata']):
122123
self.resource_version = js['object']['metadata']['resourceVersion']
124+
123125
elif js['type'].lower() == 'bookmark':
124126
self.resource_version = js['object']['metadata']['resourceVersion']
125127

@@ -135,6 +137,12 @@ async def __anext__(self):
135137
await self.close()
136138
raise
137139

140+
def _reconnect(self):
141+
self.resp.close()
142+
self.resp = None
143+
if self.resource_version:
144+
self.func.keywords['resource_version'] = self.resource_version
145+
138146
async def next(self):
139147

140148
while 1:
@@ -153,11 +161,12 @@ async def next(self):
153161
try:
154162
line = await self.resp.content.readline()
155163
except asyncio.TimeoutError:
164+
# This exception can be raised by aiohttp (client timeout)
165+
# but we don't retry if server side timeout is applied.
166+
# The base scenario would be to restart watching with timeout_seconds
167+
# reduced by time spent in previous iterations.
156168
if 'timeout_seconds' not in self.func.keywords:
157-
self.resp.close()
158-
self.resp = None
159-
if self.resource_version:
160-
self.func.keywords['resource_version'] = self.resource_version
169+
self._reconnect()
161170
continue
162171
else:
163172
raise
@@ -167,7 +176,9 @@ async def next(self):
167176
# Stop the iterator if K8s sends an empty response. This happens when
168177
# eg the supplied timeout has expired.
169178
if line == '':
170-
raise StopAsyncIteration
179+
if 'timeout_seconds' not in self.func.keywords:
180+
self._reconnect()
181+
continue
171182

172183
# Special case for faster log streaming
173184
if self.return_type == 'str':

0 commit comments

Comments
 (0)