Skip to content

Commit 436351b

Browse files
authored
Merge pull request #125 from mbohlool/exec
Improvements on ws_client
2 parents 1635150 + 88563a6 commit 436351b

File tree

6 files changed

+298
-56
lines changed

6 files changed

+298
-56
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# v1.0.0b2
2+
- Support exec calls in both interactive and non-interactive mode #58
3+
14
# v1.0.0b1
25

36
- Support insecure-skip-tls-verify config flag #99

examples/exec.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import time
2+
3+
from kubernetes import config
4+
from kubernetes.client import configuration
5+
from kubernetes.client.apis import core_v1_api
6+
from kubernetes.client.rest import ApiException
7+
8+
config.load_kube_config()
9+
configuration.assert_hostname = False
10+
api = core_v1_api.CoreV1Api()
11+
name = 'busybox-test'
12+
13+
resp = None
14+
try:
15+
resp = api.read_namespaced_pod(name=name,
16+
namespace='default')
17+
except ApiException as e:
18+
if e.status != 404:
19+
print("Unknown error: %s" % e)
20+
exit(1)
21+
22+
if not resp:
23+
print("Pod %s does not exits. Creating it..." % name)
24+
pod_manifest = {
25+
'apiVersion': 'v1',
26+
'kind': 'Pod',
27+
'metadata': {
28+
'name': name
29+
},
30+
'spec': {
31+
'containers': [{
32+
'image': 'busybox',
33+
'name': 'sleep',
34+
"args": [
35+
"/bin/sh",
36+
"-c",
37+
"while true;do date;sleep 5; done"
38+
]
39+
}]
40+
}
41+
}
42+
resp = api.create_namespaced_pod(body=pod_manifest,
43+
namespace='default')
44+
while True:
45+
resp = api.read_namespaced_pod(name=name,
46+
namespace='default')
47+
if resp.status.phase != 'Pending':
48+
break
49+
time.sleep(1)
50+
print("Done.")
51+
52+
53+
# calling exec and wait for response.
54+
exec_command = [
55+
'/bin/sh',
56+
'-c',
57+
'echo This message goes to stderr >&2; echo This message goes to stdout']
58+
resp = api.connect_get_namespaced_pod_exec(name, 'default',
59+
command=exec_command,
60+
stderr=True, stdin=False,
61+
stdout=True, tty=False)
62+
print("Response: " + resp)
63+
64+
# Calling exec interactively.
65+
exec_command = ['/bin/sh']
66+
resp = api.connect_get_namespaced_pod_exec(name, 'default',
67+
command=exec_command,
68+
stderr=True, stdin=True,
69+
stdout=True, tty=False,
70+
71+
_preload_content=False)
72+
commands = [
73+
"echo test1",
74+
"echo \"This message goes to stderr\" >&2",
75+
]
76+
while resp.is_open():
77+
resp.update(timeout=1)
78+
if resp.peek_stdout():
79+
print("STDOUT: %s" % resp.read_stdout())
80+
if resp.peek_stderr():
81+
print("STDERR: %s" % resp.read_stderr())
82+
if commands:
83+
c = commands.pop(0)
84+
print("Running command... %s\n" % c)
85+
resp.write_stdin(c + "\n")
86+
else:
87+
break
88+
89+
resp.write_stdin("date\n")
90+
sdate = resp.readline_stdout(timeout=3)
91+
print("Server date command returns: %s" % sdate)
92+
resp.write_stdin("whoami\n")
93+
user = resp.readline_stdout(timeout=3)
94+
print("Server user is: %s" % user)

kubernetes/client/api_client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,12 @@ def request(self, method, url, query_params=None, headers=None,
347347
# FIXME(dims) : We need a better way to figure out which
348348
# calls end up using web sockets
349349
if url.endswith('/exec') and (method == "GET" or method == "POST"):
350-
return ws_client.GET(self.config,
351-
url,
352-
query_params=query_params,
353-
_request_timeout=_request_timeout,
354-
headers=headers)
355-
350+
return ws_client.websocket_call(self.config,
351+
url,
352+
query_params=query_params,
353+
_request_timeout=_request_timeout,
354+
_preload_content=_preload_content,
355+
headers=headers)
356356
if method == "GET":
357357
return self.rest_client.GET(url,
358358
query_params=query_params,

kubernetes/client/ws_client.py

Lines changed: 163 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,40 @@
1212

1313
from .rest import ApiException
1414

15+
import select
1516
import certifi
17+
import time
1618
import collections
17-
import websocket
19+
from websocket import WebSocket, ABNF, enableTrace
1820
import six
1921
import ssl
2022
from six.moves.urllib.parse import urlencode
2123
from six.moves.urllib.parse import quote_plus
2224

25+
STDIN_CHANNEL = 0
26+
STDOUT_CHANNEL = 1
27+
STDERR_CHANNEL = 2
28+
2329

2430
class WSClient:
2531
def __init__(self, configuration, url, headers):
26-
self.messages = []
27-
self.errors = []
28-
websocket.enableTrace(False)
29-
header = None
32+
"""A websocket client with support for channels.
33+
34+
Exec command uses different channels for different streams. for
35+
example, 0 is stdin, 1 is stdout and 2 is stderr. Some other API calls
36+
like port forwarding can forward different pods' streams to different
37+
channels.
38+
"""
39+
enableTrace(False)
40+
header = []
41+
self._connected = False
42+
self._channels = {}
43+
self._all = ""
3044

3145
# We just need to pass the Authorization, ignore all the other
3246
# http headers we get from the generated code
33-
if 'Authorization' in headers:
34-
header = "Authorization: %s" % headers['Authorization']
35-
36-
self.ws = websocket.WebSocketApp(url,
37-
on_message=self.on_message,
38-
on_error=self.on_error,
39-
on_close=self.on_close,
40-
header=[header] if header else None)
41-
self.ws.on_open = self.on_open
47+
if headers and 'authorization' in headers:
48+
header.append("authorization: %s" % headers['authorization'])
4249

4350
if url.startswith('wss://') and configuration.verify_ssl:
4451
ssl_opts = {
@@ -52,30 +59,145 @@ def __init__(self, configuration, url, headers):
5259
else:
5360
ssl_opts = {'cert_reqs': ssl.CERT_NONE}
5461

55-
self.ws.run_forever(sslopt=ssl_opts)
56-
57-
def on_message(self, ws, message):
58-
if message[0] == '\x01':
59-
message = message[1:]
60-
if message:
61-
if six.PY3 and isinstance(message, six.binary_type):
62-
message = message.decode('utf-8')
63-
self.messages.append(message)
64-
65-
def on_error(self, ws, error):
66-
self.errors.append(error)
67-
68-
def on_close(self, ws):
69-
pass
70-
71-
def on_open(self, ws):
72-
pass
62+
self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False)
63+
self.sock.connect(url, header=header)
64+
self._connected = True
65+
66+
def peek_channel(self, channel, timeout=0):
67+
"""Peek a channel and return part of the input,
68+
empty string otherwise."""
69+
self.update(timeout=timeout)
70+
if channel in self._channels:
71+
return self._channels[channel]
72+
return ""
73+
74+
def read_channel(self, channel, timeout=0):
75+
"""Read data from a channel."""
76+
if channel not in self._channels:
77+
ret = self.peek_channel(channel, timeout)
78+
else:
79+
ret = self._channels[channel]
80+
if channel in self._channels:
81+
del self._channels[channel]
82+
return ret
83+
84+
def readline_channel(self, channel, timeout=None):
85+
"""Read a line from a channel."""
86+
if timeout is None:
87+
timeout = float("inf")
88+
start = time.time()
89+
while self.is_open() and time.time() - start < timeout:
90+
if channel in self._channels:
91+
data = self._channels[channel]
92+
if "\n" in data:
93+
index = data.find("\n")
94+
ret = data[:index]
95+
data = data[index+1:]
96+
if data:
97+
self._channels[channel] = data
98+
else:
99+
del self._channels[channel]
100+
return ret
101+
self.update(timeout=(timeout - time.time() + start))
102+
103+
def write_channel(self, channel, data):
104+
"""Write data to a channel."""
105+
self.sock.send(chr(channel) + data)
106+
107+
def peek_stdout(self, timeout=0):
108+
"""Same as peek_channel with channel=1."""
109+
return self.peek_channel(STDOUT_CHANNEL, timeout=timeout)
110+
111+
def read_stdout(self, timeout=None):
112+
"""Same as read_channel with channel=1."""
113+
return self.read_channel(STDOUT_CHANNEL, timeout=timeout)
114+
115+
def readline_stdout(self, timeout=None):
116+
"""Same as readline_channel with channel=1."""
117+
return self.readline_channel(STDOUT_CHANNEL, timeout=timeout)
118+
119+
def peek_stderr(self, timeout=0):
120+
"""Same as peek_channel with channel=2."""
121+
return self.peek_channel(STDERR_CHANNEL, timeout=timeout)
122+
123+
def read_stderr(self, timeout=None):
124+
"""Same as read_channel with channel=2."""
125+
return self.read_channel(STDERR_CHANNEL, timeout=timeout)
126+
127+
def readline_stderr(self, timeout=None):
128+
"""Same as readline_channel with channel=2."""
129+
return self.readline_channel(STDERR_CHANNEL, timeout=timeout)
130+
131+
def read_all(self):
132+
"""Read all of the inputs with the same order they recieved. The channel
133+
information would be part of the string. This is useful for
134+
non-interactive call where a set of command passed to the API call and
135+
their result is needed after the call is concluded.
136+
137+
TODO: Maybe we can process this and return a more meaningful map with
138+
channels mapped for each input.
139+
"""
140+
out = self._all
141+
self._all = ""
142+
self._channels = {}
143+
return out
144+
145+
def is_open(self):
146+
"""True if the connection is still alive."""
147+
return self._connected
148+
149+
def write_stdin(self, data):
150+
"""The same as write_channel with channel=0."""
151+
self.write_channel(STDIN_CHANNEL, data)
152+
153+
def update(self, timeout=0):
154+
"""Update channel buffers with at most one complete frame of input."""
155+
if not self.is_open():
156+
return
157+
if not self.sock.connected:
158+
self._connected = False
159+
return
160+
r, _, _ = select.select(
161+
(self.sock.sock, ), (), (), timeout)
162+
if r:
163+
op_code, frame = self.sock.recv_data_frame(True)
164+
if op_code == ABNF.OPCODE_CLOSE:
165+
self._connected = False
166+
return
167+
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
168+
data = frame.data
169+
if six.PY3:
170+
data = data.decode("utf-8")
171+
self._all += data
172+
if len(data) > 1:
173+
channel = ord(data[0])
174+
data = data[1:]
175+
if data:
176+
if channel not in self._channels:
177+
self._channels[channel] = data
178+
else:
179+
self._channels[channel] += data
180+
181+
def run_forever(self, timeout=None):
182+
"""Wait till connection is closed or timeout reached. Buffer any input
183+
received during this time."""
184+
if timeout:
185+
start = time.time()
186+
while self.is_open() and time.time() - start < timeout:
187+
self.update(timeout=(timeout - time.time() + start))
188+
else:
189+
while self.is_open():
190+
self.update(timeout=None)
73191

74192

75193
WSResponse = collections.namedtuple('WSResponse', ['data'])
76194

77195

78-
def GET(configuration, url, query_params, _request_timeout, headers):
196+
def websocket_call(configuration, url, query_params, _request_timeout,
197+
_preload_content, headers):
198+
"""An internal function to be called in api-client when a websocket
199+
connection is required."""
200+
79201
# switch protocols from http to websocket
80202
url = url.replace('http://', 'ws://')
81203
url = url.replace('https://', 'wss://')
@@ -105,10 +227,11 @@ def GET(configuration, url, query_params, _request_timeout, headers):
105227
else:
106228
url += '&command=' + quote_plus(commands)
107229

108-
client = WSClient(configuration, url, headers)
109-
if client.errors:
110-
raise ApiException(
111-
status=0,
112-
reason='\n'.join([str(error) for error in client.errors])
113-
)
114-
return WSResponse('%s' % ''.join(client.messages))
230+
try:
231+
client = WSClient(configuration, url, headers)
232+
if not _preload_content:
233+
return client
234+
client.run_forever(timeout=_request_timeout)
235+
return WSResponse('%s' % ''.join(client.read_all()))
236+
except (Exception, KeyboardInterrupt, SystemExit) as e:
237+
raise ApiException(status=0, reason=str(e))

kubernetes/e2e_test/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ def get_e2e_configuration():
4242
if config.host is None:
4343
raise unittest.SkipTest('Unable to find a running Kubernetes instance')
4444
print('Running test against : %s' % config.host)
45+
config.assert_hostname = False
4546
return config

0 commit comments

Comments
 (0)