Skip to content

Commit fd956cd

Browse files
committed
fixup! Improvements on ws_client. Now the client can returns an object to interact with websocket server and reach each channel separately
1 parent 8e3fb91 commit fd956cd

File tree

2 files changed

+43
-10
lines changed

2 files changed

+43
-10
lines changed

kubernetes/client/api_client.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,12 @@ def request(self, method, url, query_params=None, headers=None,
347347
# FIXME(dims) : We need a better way to figure out which
348348
# calls end up using web sockets
349349
if url.endswith('/exec') and (method == "GET" or method == "POST"):
350-
return ws_client.GET(self.config,
351-
url,
352-
query_params=query_params,
353-
_request_timeout=_request_timeout,
354-
_preload_content=_preload_content,
355-
headers=headers)
356-
350+
return ws_client.websocket_call(self.config,
351+
url,
352+
query_params=query_params,
353+
_request_timeout=_request_timeout,
354+
_preload_content=_preload_content,
355+
headers=headers)
357356
if method == "GET":
358357
return self.rest_client.GET(url,
359358
query_params=query_params,

kubernetes/client/ws_client.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929

3030
class WSClient:
3131
def __init__(self, configuration, url, headers):
32+
"""A websocket client with support for channels.
33+
34+
Exec command uses different channels for different streams. for
35+
example, 0 is stdin, 1 is stdout and 2 is stderr. Some other API calls
36+
like port forwarding can forward different pods' streams to different
37+
channels.
38+
"""
3239
enableTrace(False)
3340
header = []
3441
self._connected = False
@@ -37,7 +44,7 @@ def __init__(self, configuration, url, headers):
3744

3845
# We just need to pass the Authorization, ignore all the other
3946
# http headers we get from the generated code
40-
if 'Authorization' in headers:
47+
if headers and 'Authorization' in headers:
4148
header.append("Authorization: %s" % headers['Authorization'])
4249

4350
if url.startswith('wss://') and configuration.verify_ssl:
@@ -57,12 +64,15 @@ def __init__(self, configuration, url, headers):
5764
self._connected = True
5865

5966
def peek_channel(self, channel, timeout=0):
67+
"""Peek a channel and return part of the input,
68+
empty string otherwise."""
6069
self.update(timeout=timeout)
6170
if channel in self._channels:
6271
return self._channels[channel]
6372
return ""
6473

6574
def read_channel(self, channel, timeout=0):
75+
"""Read data from a channel."""
6676
if channel not in self._channels:
6777
ret = self.peek_channel(channel, timeout)
6878
else:
@@ -72,6 +82,7 @@ def read_channel(self, channel, timeout=0):
7282
return ret
7383

7484
def readline_channel(self, channel, timeout=None):
85+
"""Read a line from a channel."""
7586
if timeout is None:
7687
timeout = float("inf")
7788
start = time.time()
@@ -90,39 +101,57 @@ def readline_channel(self, channel, timeout=None):
90101
self.update(timeout=(timeout - time.time() + start))
91102

92103
def write_channel(self, channel, data):
104+
"""Write data to a channel."""
93105
self.sock.send(chr(channel) + data)
94106

95107
def peek_stdout(self, timeout=0):
108+
"""Same as peek_channel with channel=1."""
96109
return self.peek_channel(STDOUT_CHANNEL, timeout=timeout)
97110

98111
def read_stdout(self, timeout=None):
112+
"""Same as read_channel with channel=1."""
99113
return self.read_channel(STDOUT_CHANNEL, timeout=timeout)
100114

101115
def readline_stdout(self, timeout=None):
116+
"""Same as readline_channel with channel=1."""
102117
return self.readline_channel(STDOUT_CHANNEL, timeout=timeout)
103118

104119
def peek_stderr(self, timeout=0):
120+
"""Same as peek_channel with channel=2."""
105121
return self.peek_channel(STDERR_CHANNEL, timeout=timeout)
106122

107123
def read_stderr(self, timeout=None):
124+
"""Same as read_channel with channel=2."""
108125
return self.read_channel(STDERR_CHANNEL, timeout=timeout)
109126

110127
def readline_stderr(self, timeout=None):
128+
"""Same as readline_channel with channel=2."""
111129
return self.readline_channel(STDERR_CHANNEL, timeout=timeout)
112130

113131
def read_all(self):
132+
"""Read all of the inputs with the same order they recieved. The channel
133+
information would be part of the string. This is useful for
134+
non-interactive call where a set of command passed to the API call and
135+
their result is needed after the call is concluded.
136+
137+
TODO: Maybe we can process this and return a more meaningful map with
138+
channels mapped for each input.
139+
"""
114140
out = self._all
115141
self._all = ""
116142
self._channels = {}
117143
return out
118144

119145
def is_open(self):
146+
"""True if the connection is still alive."""
120147
return self._connected
121148

122149
def write_stdin(self, data):
150+
"""The same as write_channel with channel=0."""
123151
self.write_channel(STDIN_CHANNEL, data)
124152

125153
def update(self, timeout=0):
154+
"""Update channel buffers with at most one complete frame of input."""
126155
if not self.is_open():
127156
return
128157
if not self.sock.connected:
@@ -150,6 +179,8 @@ def update(self, timeout=0):
150179
self._channels[channel] += data
151180

152181
def run_forever(self, timeout=None):
182+
"""Wait till connection is closed or timeout reached. Buffer any input
183+
received during this time."""
153184
if timeout:
154185
start = time.time()
155186
while self.is_open() and time.time() - start < timeout:
@@ -162,8 +193,11 @@ def run_forever(self, timeout=None):
162193
WSResponse = collections.namedtuple('WSResponse', ['data'])
163194

164195

165-
def GET(configuration, url, query_params, _request_timeout, _preload_content,
166-
headers):
196+
def websocket_call(configuration, url, query_params, _request_timeout,
197+
_preload_content, headers):
198+
"""An internal function to be called in api-client when a websocket
199+
connection is required."""
200+
167201
# switch protocols from http to websocket
168202
url = url.replace('http://', 'ws://')
169203
url = url.replace('https://', 'wss://')

0 commit comments

Comments
 (0)