Skip to content

Commit 4079c67

Browse files
authored
Merge pull request kubernetes-client#1237 from iciclespider/port-forwarding
Unittests for portforwarding ability added in python-base.
2 parents b5603d8 + b1dd9c4 commit 4079c67

File tree

4 files changed

+360
-4
lines changed

4 files changed

+360
-4
lines changed

examples/pod_portforward.py

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# Copyright 2020 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Shows the functionality of portforward streaming using an nginx container.
17+
"""
18+
19+
import select
20+
import socket
21+
import time
22+
23+
import six.moves.urllib.request as urllib_request
24+
25+
from kubernetes import config
26+
from kubernetes.client import Configuration
27+
from kubernetes.client.api import core_v1_api
28+
from kubernetes.client.rest import ApiException
29+
from kubernetes.stream import portforward
30+
31+
##############################################################################
32+
# Kubernetes pod port forwarding works by directly providing a socket which
33+
# the python application uses to send and receive data on. This is in contrast
34+
# to the go client, which opens a local port that the go application then has
35+
# to open to get a socket to transmit data.
36+
#
37+
# This simplifies the python application, there is not a local port to worry
38+
# about if that port number is available. Nor does the python application have
39+
# to then deal with opening this local port. The socket used to transmit data
40+
# is immediately provided to the python application.
41+
#
42+
# Below also is an example of monkey patching the socket.create_connection
43+
# function so that DNS names of the following formats will access kubernetes
44+
# ports:
45+
#
46+
# <pod-name>.<namespace>.kubernetes
47+
# <pod-name>.pod.<namespace>.kubernetes
48+
# <service-name>.svc.<namespace>.kubernetes
49+
# <service-name>.service.<namespace>.kubernetes
50+
#
51+
# These DNS name can be used to interact with pod ports using python libraries,
52+
# such as urllib.request and http.client. For example:
53+
#
54+
# response = urllib.request.urlopen(
55+
# 'https://metrics-server.service.kube-system.kubernetes/'
56+
# )
57+
#
58+
##############################################################################
59+
60+
61+
def portforward_commands(api_instance):
62+
name = 'portforward-example'
63+
resp = None
64+
try:
65+
resp = api_instance.read_namespaced_pod(name=name,
66+
namespace='default')
67+
except ApiException as e:
68+
if e.status != 404:
69+
print("Unknown error: %s" % e)
70+
exit(1)
71+
72+
if not resp:
73+
print("Pod %s does not exist. Creating it..." % name)
74+
pod_manifest = {
75+
'apiVersion': 'v1',
76+
'kind': 'Pod',
77+
'metadata': {
78+
'name': name
79+
},
80+
'spec': {
81+
'containers': [{
82+
'image': 'nginx',
83+
'name': 'nginx',
84+
}]
85+
}
86+
}
87+
api_instance.create_namespaced_pod(body=pod_manifest,
88+
namespace='default')
89+
while True:
90+
resp = api_instance.read_namespaced_pod(name=name,
91+
namespace='default')
92+
if resp.status.phase != 'Pending':
93+
break
94+
time.sleep(1)
95+
print("Done.")
96+
97+
pf = portforward(
98+
api_instance.connect_get_namespaced_pod_portforward,
99+
name, 'default',
100+
ports='80',
101+
)
102+
http = pf.socket(80)
103+
http.setblocking(True)
104+
http.sendall(b'GET / HTTP/1.1\r\n')
105+
http.sendall(b'Host: 127.0.0.1\r\n')
106+
http.sendall(b'Connection: close\r\n')
107+
http.sendall(b'Accept: */*\r\n')
108+
http.sendall(b'\r\n')
109+
response = b''
110+
while True:
111+
select.select([http], [], [])
112+
data = http.recv(1024)
113+
if not data:
114+
break
115+
response += data
116+
http.close()
117+
print(response.decode('utf-8'))
118+
error = pf.error(80)
119+
if error is None:
120+
print("No port forward errors on port 80.")
121+
else:
122+
print("Port 80 has the following error: %s" % error)
123+
124+
# Monkey patch socket.create_connection which is used by http.client and
125+
# urllib.request. The same can be done with urllib3.util.connection.create_connection
126+
# if the "requests" package is used.
127+
socket_create_connection = socket.create_connection
128+
129+
def kubernetes_create_connection(address, *args, **kwargs):
130+
dns_name = address[0]
131+
if isinstance(dns_name, bytes):
132+
dns_name = dns_name.decode()
133+
dns_name = dns_name.split(".")
134+
if dns_name[-1] != 'kubernetes':
135+
return socket_create_connection(address, *args, **kwargs)
136+
if len(dns_name) not in (3, 4):
137+
raise RuntimeError("Unexpected kubernetes DNS name.")
138+
namespace = dns_name[-2]
139+
name = dns_name[0]
140+
port = address[1]
141+
if len(dns_name) == 4:
142+
if dns_name[1] in ('svc', 'service'):
143+
service = api_instance.read_namespaced_service(name, namespace)
144+
for service_port in service.spec.ports:
145+
if service_port.port == port:
146+
port = service_port.target_port
147+
break
148+
else:
149+
raise RuntimeError(
150+
"Unable to find service port: %s" % port)
151+
label_selector = []
152+
for key, value in service.spec.selector.items():
153+
label_selector.append("%s=%s" % (key, value))
154+
pods = api_instance.list_namespaced_pod(
155+
namespace, label_selector=",".join(label_selector)
156+
)
157+
if not pods.items:
158+
raise RuntimeError("Unable to find service pods.")
159+
name = pods.items[0].metadata.name
160+
if isinstance(port, str):
161+
for container in pods.items[0].spec.containers:
162+
for container_port in container.ports:
163+
if container_port.name == port:
164+
port = container_port.container_port
165+
break
166+
else:
167+
continue
168+
break
169+
else:
170+
raise RuntimeError(
171+
"Unable to find service port name: %s" % port)
172+
elif dns_name[1] != 'pod':
173+
raise RuntimeError(
174+
"Unsupported resource type: %s" %
175+
dns_name[1])
176+
pf = portforward(api_instance.connect_get_namespaced_pod_portforward,
177+
name, namespace, ports=str(port))
178+
return pf.socket(port)
179+
socket.create_connection = kubernetes_create_connection
180+
181+
# Access the nginx http server using the
182+
# "<pod-name>.pod.<namespace>.kubernetes" dns name.
183+
response = urllib_request.urlopen(
184+
'http://%s.pod.default.kubernetes' % name)
185+
html = response.read().decode('utf-8')
186+
response.close()
187+
print('Status Code: %s' % response.code)
188+
print(html)
189+
190+
191+
def main():
192+
config.load_kube_config()
193+
c = Configuration.get_default_copy()
194+
c.assert_hostname = False
195+
Configuration.set_default(c)
196+
core_v1 = core_v1_api.CoreV1Api()
197+
198+
portforward_commands(core_v1)
199+
200+
201+
if __name__ == '__main__':
202+
main()

kubernetes/e2e_test/test_client.py

+154-1
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@
1313
# under the License.
1414

1515
import json
16+
import select
17+
import socket
1618
import time
1719
import unittest
1820
import uuid
1921

2022
from kubernetes.client import api_client
2123
from kubernetes.client.api import core_v1_api
2224
from kubernetes.e2e_test import base
23-
from kubernetes.stream import stream
25+
from kubernetes.stream import stream, portforward
2426
from kubernetes.stream.ws_client import ERROR_CHANNEL
2527

28+
import six.moves.urllib.request as urllib_request
2629

2730
def short_uuid():
2831
id = str(uuid.uuid4())
@@ -119,6 +122,7 @@ def test_pod_apis(self):
119122

120123
resp = api.delete_namespaced_pod(name=name, body={},
121124
namespace='default')
125+
122126
def test_exit_code(self):
123127
client = api_client.ApiClient(configuration=self.config)
124128
api = core_v1_api.CoreV1Api(client)
@@ -159,6 +163,155 @@ def test_exit_code(self):
159163
resp = api.delete_namespaced_pod(name=name, body={},
160164
namespace='default')
161165

166+
def test_portforward_raw(self):
167+
client = api_client.ApiClient(configuration=self.config)
168+
api = core_v1_api.CoreV1Api(client)
169+
170+
name = 'portforward-raw-' + short_uuid()
171+
pod_manifest = manifest_with_command(
172+
name,
173+
' '.join((
174+
'((while true;do nc -l -p 1234 -e /bin/cat; done)&);',
175+
'((while true;do nc -l -p 1235 -e /bin/cat; done)&);',
176+
'sleep 60',
177+
))
178+
)
179+
resp = api.create_namespaced_pod(body=pod_manifest,
180+
namespace='default')
181+
self.assertEqual(name, resp.metadata.name)
182+
self.assertTrue(resp.status.phase)
183+
184+
while True:
185+
resp = api.read_namespaced_pod(name=name,
186+
namespace='default')
187+
self.assertEqual(name, resp.metadata.name)
188+
self.assertTrue(resp.status.phase)
189+
if resp.status.phase != 'Pending':
190+
break
191+
time.sleep(1)
192+
193+
pf = portforward(api.connect_get_namespaced_pod_portforward,
194+
name, 'default',
195+
ports='1234,1235,1236')
196+
self.assertTrue(pf.connected)
197+
sock1234 = pf.socket(1234)
198+
sock1235 = pf.socket(1235)
199+
sock1234.setblocking(True)
200+
sock1235.setblocking(True)
201+
sent1234 = b'Test port 1234 forwarding...'
202+
sent1235 = b'Test port 1235 forwarding...'
203+
sock1234.sendall(sent1234)
204+
sock1235.sendall(sent1235)
205+
reply1234 = b''
206+
reply1235 = b''
207+
while True:
208+
rlist = []
209+
if sock1234.fileno() != -1:
210+
rlist.append(sock1234)
211+
if sock1235.fileno() != -1:
212+
rlist.append(sock1235)
213+
if not rlist:
214+
break
215+
r, _w, _x = select.select(rlist, [], [], 1)
216+
if not r:
217+
break
218+
if sock1234 in r:
219+
data = sock1234.recv(1024)
220+
self.assertNotEqual(data, b'', "Unexpected socket close")
221+
reply1234 += data
222+
if sock1235 in r:
223+
data = sock1235.recv(1024)
224+
self.assertNotEqual(data, b'', "Unexpected socket close")
225+
reply1235 += data
226+
self.assertEqual(reply1234, sent1234)
227+
self.assertEqual(reply1235, sent1235)
228+
self.assertTrue(pf.connected)
229+
230+
sock = pf.socket(1236)
231+
self.assertRaises(socket.error, sock.sendall, b'This should fail...')
232+
self.assertIsNotNone(pf.error(1236))
233+
sock.close()
234+
235+
for sock in (sock1234, sock1235):
236+
self.assertTrue(pf.connected)
237+
sent = b'Another test using fileno %s' % str(sock.fileno()).encode()
238+
sock.sendall(sent)
239+
reply = b''
240+
while True:
241+
r, _w, _x = select.select([sock], [], [], 1)
242+
if not r:
243+
break
244+
data = sock.recv(1024)
245+
self.assertNotEqual(data, b'', "Unexpected socket close")
246+
reply += data
247+
self.assertEqual(reply, sent)
248+
sock.close()
249+
time.sleep(1)
250+
self.assertFalse(pf.connected)
251+
self.assertIsNone(pf.error(1234))
252+
self.assertIsNone(pf.error(1235))
253+
254+
resp = api.delete_namespaced_pod(name=name, body={},
255+
namespace='default')
256+
257+
def test_portforward_http(self):
258+
client = api_client.ApiClient(configuration=self.config)
259+
api = core_v1_api.CoreV1Api(client)
260+
261+
name = 'portforward-http-' + short_uuid()
262+
pod_manifest = {
263+
'apiVersion': 'v1',
264+
'kind': 'Pod',
265+
'metadata': {
266+
'name': name
267+
},
268+
'spec': {
269+
'containers': [{
270+
'name': 'nginx',
271+
'image': 'nginx',
272+
}]
273+
}
274+
}
275+
276+
resp = api.create_namespaced_pod(body=pod_manifest,
277+
namespace='default')
278+
self.assertEqual(name, resp.metadata.name)
279+
self.assertTrue(resp.status.phase)
280+
281+
while True:
282+
resp = api.read_namespaced_pod(name=name,
283+
namespace='default')
284+
self.assertEqual(name, resp.metadata.name)
285+
self.assertTrue(resp.status.phase)
286+
if resp.status.phase != 'Pending':
287+
break
288+
time.sleep(1)
289+
290+
def kubernetes_create_connection(address, *args, **kwargs):
291+
dns_name = address[0]
292+
if isinstance(dns_name, bytes):
293+
dns_name = dns_name.decode()
294+
dns_name = dns_name.split(".")
295+
if len(dns_name) != 3 or dns_name[2] != "kubernetes":
296+
return socket_create_connection(address, *args, **kwargs)
297+
pf = portforward(api.connect_get_namespaced_pod_portforward,
298+
dns_name[0], dns_name[1], ports=str(address[1]))
299+
return pf.socket(address[1])
300+
301+
socket_create_connection = socket.create_connection
302+
try:
303+
socket.create_connection = kubernetes_create_connection
304+
response = urllib_request.urlopen('http://%s.default.kubernetes/' % name)
305+
html = response.read().decode('utf-8')
306+
finally:
307+
socket.create_connection = socket_create_connection
308+
309+
self.assertEqual(response.code, 200)
310+
self.assertTrue('<h1>Welcome to nginx!</h1>' in html)
311+
312+
resp = api.delete_namespaced_pod(name=name, body={},
313+
namespace='default')
314+
162315
def test_service_apis(self):
163316
client = api_client.ApiClient(configuration=self.config)
164317
api = core_v1_api.CoreV1Api(client)

0 commit comments

Comments
 (0)