Skip to content

Commit df4c73b

Browse files
committed
WIP: Improvements on ws_client. Now the client can returns an object to interact with websocket server
1 parent 1635150 commit df4c73b

File tree

4 files changed

+191
-39
lines changed

4 files changed

+191
-39
lines changed

examples/exec.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import time
2+
3+
from kubernetes import config
4+
from kubernetes.client import configuration
5+
from kubernetes.client.apis import core_v1_api
6+
from kubernetes.client.rest import ApiException
7+
8+
config.load_kube_config(
9+
context="gke_cloud-kubernetes-dev_us-central1-f_mehdy-cluster")
10+
configuration.assert_hostname = False
11+
api = core_v1_api.CoreV1Api()
12+
name = 'busybox-test2'
13+
14+
resp = None
15+
try:
16+
resp = api.read_namespaced_pod(name=name,
17+
namespace='default')
18+
except ApiException as e:
19+
if e.status != 404:
20+
print("Unknown error: %s" % e)
21+
exit(1)
22+
23+
if not resp:
24+
print("Pod %s does not exits. Creating it..." % name)
25+
pod_manifest = {
26+
'apiVersion': 'v1',
27+
'kind': 'Pod',
28+
'metadata': {
29+
'name': name
30+
},
31+
'spec': {
32+
'containers': [{
33+
'image': 'busybox',
34+
'name': 'sleep',
35+
"args": [
36+
"/bin/sh",
37+
"-c",
38+
"while true;do date;sleep 5; done"
39+
]
40+
}]
41+
}
42+
}
43+
resp = api.create_namespaced_pod(body=pod_manifest,
44+
namespace='default')
45+
while True:
46+
resp = api.read_namespaced_pod(name=name,
47+
namespace='default')
48+
if resp.status.phase != 'Pending':
49+
break
50+
time.sleep(1)
51+
print "Done."
52+
53+
exec_command = [
54+
'/bin/sh',
55+
'-c',
56+
'sleep 1; echo This message goes to stderr >&2; sleep 2; echo test2']
57+
resp = api.connect_get_namespaced_pod_exec(name, 'default',
58+
command=exec_command,
59+
stderr=True, stdin=True,
60+
stdout=True, tty=False,
61+
_preload_content=False)
62+
while resp.is_open():
63+
resp.update(timeout=1)
64+
if resp.peek_stdout():
65+
print("STDOUT: %s" % resp.read_stdout())
66+
if resp.peek_stderr():
67+
print("STDERR: %s" % resp.read_stderr())
68+
69+
exit(1)
70+
# This part does not work yet. resp.write_stdin does not work.
71+
72+
exec_command = ['/bin/sh']
73+
resp = api.connect_get_namespaced_pod_exec(name, 'default',
74+
command=exec_command,
75+
stderr=True, stdin=True,
76+
stdout=True, tty=False,
77+
_preload_content=False)
78+
commands = [
79+
"echo test1",
80+
"sleep 1",
81+
"echo This message goes to stderr >&2",
82+
"sleep 2",
83+
"exit"
84+
]
85+
while resp.is_open():
86+
resp.update(timeout=1)
87+
if resp.peek_stdout():
88+
print("STDOUT: %s" % resp.read_stdout())
89+
if resp.peek_stderr():
90+
print("STDERR: %s" % resp.read_stderr())
91+
if commands:
92+
c = commands.pop(0)
93+
print "Running command... %s" % c
94+
resp.write_stdin(c)

kubernetes/client/api_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ def request(self, method, url, query_params=None, headers=None,
351351
url,
352352
query_params=query_params,
353353
_request_timeout=_request_timeout,
354+
_preload_content=_preload_content,
354355
headers=headers)
355356

356357
if method == "GET":

kubernetes/client/ws_client.py

Lines changed: 95 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,30 @@
1212

1313
from .rest import ApiException
1414

15+
import select
1516
import certifi
17+
import time
1618
import collections
17-
import websocket
19+
from websocket import WebSocket, ABNF, enableTrace
1820
import six
1921
import ssl
2022
from six.moves.urllib.parse import urlencode
2123
from six.moves.urllib.parse import quote_plus
22-
24+
import socket
2325

2426
class WSClient:
2527
def __init__(self, configuration, url, headers):
26-
self.messages = []
27-
self.errors = []
28-
websocket.enableTrace(False)
29-
header = None
28+
enableTrace(False)
29+
header = []
30+
self._connected = False
31+
self._stdout = ""
32+
self._stderr = ""
33+
self._all = ""
3034

3135
# We just need to pass the Authorization, ignore all the other
3236
# http headers we get from the generated code
3337
if 'Authorization' in headers:
34-
header = "Authorization: %s" % headers['Authorization']
35-
36-
self.ws = websocket.WebSocketApp(url,
37-
on_message=self.on_message,
38-
on_error=self.on_error,
39-
on_close=self.on_close,
40-
header=[header] if header else None)
41-
self.ws.on_open = self.on_open
38+
header.append("Authorization: %s" % headers['Authorization'])
4239

4340
if url.startswith('wss://') and configuration.verify_ssl:
4441
ssl_opts = {
@@ -52,30 +49,87 @@ def __init__(self, configuration, url, headers):
5249
else:
5350
ssl_opts = {'cert_reqs': ssl.CERT_NONE}
5451

55-
self.ws.run_forever(sslopt=ssl_opts)
56-
57-
def on_message(self, ws, message):
58-
if message[0] == '\x01':
59-
message = message[1:]
60-
if message:
61-
if six.PY3 and isinstance(message, six.binary_type):
62-
message = message.decode('utf-8')
63-
self.messages.append(message)
64-
65-
def on_error(self, ws, error):
66-
self.errors.append(error)
67-
68-
def on_close(self, ws):
69-
pass
70-
71-
def on_open(self, ws):
72-
pass
52+
self.sock = WebSocket(sslopt=ssl_opts, skip_utf8_validation=False)
53+
self.sock.connect(url, header=header)
54+
self._connected = True
55+
56+
def peek_stdout(self):
57+
self.update()
58+
return self._stdout
59+
60+
def read_stdout(self):
61+
if not self._stdout:
62+
self.update(timeout=None)
63+
ret = self._stdout
64+
self._stdout = ""
65+
return ret
66+
67+
def peek_stderr(self):
68+
self.update()
69+
return self._stderr
70+
71+
def read_stderr(self):
72+
if not self._stderr:
73+
self.update(timeout=None)
74+
ret = self._stderr
75+
self._stderr = ""
76+
return ret
77+
78+
def read_all(self):
79+
out = self._all
80+
self._all = ""
81+
self._stdout = ""
82+
self._stderr = ""
83+
return out
84+
85+
def is_open(self):
86+
return self._connected
87+
88+
# TODO: This method does not seem to work.
89+
def write_stdin(self, data):
90+
self.sock.send(data)
91+
92+
def update(self, timeout=0):
93+
if not self.is_open():
94+
return
95+
if not self.sock.connected:
96+
self._connected = False
97+
return
98+
r, _, _ = select.select(
99+
(self.sock.sock, ), (), (), timeout)
100+
if r:
101+
op_code, frame = self.sock.recv_data_frame(True)
102+
if op_code == ABNF.OPCODE_CLOSE:
103+
self._connected = False
104+
return
105+
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
106+
data = frame.data
107+
if six.PY3:
108+
data = data.decode("utf-8")
109+
if data[0] == '\x01':
110+
data = data[1:]
111+
if data:
112+
self._all += data
113+
if data[0] == '\x02':
114+
self._stderr += data[1:]
115+
else:
116+
self._stdout += data
117+
118+
def run_forever(self, timeout=None):
119+
if timeout:
120+
start = time.time()
121+
while self.is_open() and time.time() - start < timeout:
122+
self.update(timeout=(timeout - time.time() + start))
123+
else:
124+
while self.is_open():
125+
self.update(timeout=None)
73126

74127

75128
WSResponse = collections.namedtuple('WSResponse', ['data'])
76129

77130

78-
def GET(configuration, url, query_params, _request_timeout, headers):
131+
def GET(configuration, url, query_params, _request_timeout, _preload_content,
132+
headers):
79133
# switch protocols from http to websocket
80134
url = url.replace('http://', 'ws://')
81135
url = url.replace('https://', 'wss://')
@@ -105,10 +159,12 @@ def GET(configuration, url, query_params, _request_timeout, headers):
105159
else:
106160
url += '&command=' + quote_plus(commands)
107161

108-
client = WSClient(configuration, url, headers)
109-
if client.errors:
110-
raise ApiException(
111-
status=0,
112-
reason='\n'.join([str(error) for error in client.errors])
113-
)
162+
try:
163+
client = WSClient(configuration, url, headers)
164+
if not _preload_content:
165+
return client
166+
client.run_forever(timeout=_request_timeout)
167+
return WSResponse('%s' % ''.join(client.read_all()))
168+
except (Exception, KeyboardInterrupt, SystemExit) as e:
169+
raise ApiException(status=0, reason=str(e))
114170
return WSResponse('%s' % ''.join(client.messages))

kubernetes/e2e_test/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ def get_e2e_configuration():
4242
if config.host is None:
4343
raise unittest.SkipTest('Unable to find a running Kubernetes instance')
4444
print('Running test against : %s' % config.host)
45+
config.assert_hostname = False
4546
return config

0 commit comments

Comments
 (0)