Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Keep the watch action working forever #36

Merged
merged 2 commits into from
Jan 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, return_type=None):
self._raw_return_type = return_type
self._stop = False
self._api_client = client.ApiClient()
self.resource_version = 0

def stop(self):
self._stop = True
Expand All @@ -81,6 +82,8 @@ def unmarshal_event(self, data, return_type):
if return_type:
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
return js

def stream(self, func, *args, **kwargs):
Expand Down Expand Up @@ -113,12 +116,19 @@ def stream(self, func, *args, **kwargs):
return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs['_preload_content'] = False
resp = func(*args, **kwargs)
try:
for line in iter_resp_lines(resp):
yield self.unmarshal_event(line, return_type)
if self._stop:
break
finally:
resp.close()
resp.release_conn()

timeouts = ('timeout_seconds' in kwargs)
while True:
resp = func(*args, **kwargs)
try:
for line in iter_resp_lines(resp):
yield self.unmarshal_event(line, return_type)
if self._stop:
break
finally:
kwargs['resource_version'] = self.resource_version
resp.close()
resp.release_conn()

if timeouts or self._stop:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the user set timeouts but it has not been passed yet?

Copy link
Author

@lichen2013 lichen2013 Oct 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still trying to find the proof that the exit happens we saw at the beginning is caused by the client side.

I do find the flag 'min-request-timeout' in doc kube-apiserver, and it is said that the apiserver actually has a timeout setting by itself:

--min-request-timeout=1800: An optional field indicating the minimum number of seconds a handler must keep a request open before timing it out. Currently only honored by the watch request handler, which picks a randomized value above this number as the connection timeout, to spread out load.

And if it is not set, the default value is 1800:

https://github.com/kubernetes/apiserver/blob/master/pkg/server/config.go#L248

MinRequestTimeout: 1800,

So, base on what I know now: I don't think timeout will happen before the timeouts setting.

break
32 changes: 32 additions & 0 deletions watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ def test_watch_stream_twice(self):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_stream_loop(self):
w = Watch(float)

fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
return_value=['{"type": "ADDED", "object": 1}\n'])

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

count = 0

# when timeout_seconds is set, auto-exist when timeout reaches
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
count = count + 1
self.assertEqual(count, 1)

# when no timeout_seconds, only exist when w.stop() is called
for e in w.stream(fake_api.get_namespaces):
count = count + 1
if count == 2:
w.stop()

self.assertEqual(count, 2)
self.assertEqual(fake_api.get_namespaces.call_count, 2)
self.assertEqual(fake_resp.read_chunked.call_count, 2)
self.assertEqual(fake_resp.close.call_count, 2)
self.assertEqual(fake_resp.release_conn.call_count, 2)

def test_unmarshal_with_float_object(self):
w = Watch()
event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float')
Expand Down