@@ -237,30 +237,30 @@ def __init__(self, websocket, ports):
237
237
"""
238
238
239
239
self .websocket = websocket
240
- self .ports = {}
241
- for ix , port_number in enumerate (ports ):
242
- self .ports [ port_number ] = self ._Port (ix , port_number )
240
+ self .local_ports = {}
241
+ for ix , local_remote in enumerate (ports ):
242
+ self .local_ports [ local_remote [ 0 ]] = self ._Port (ix , local_remote [ 1 ] )
243
243
threading .Thread (
244
244
name = "Kubernetes port forward proxy" , target = self ._proxy , daemon = True
245
245
).start ()
246
246
247
- def socket (self , port_number ):
248
- if port_number not in self .ports :
247
+ def socket (self , local_number ):
248
+ if local_number not in self .local_ports :
249
249
raise ValueError ("Invalid port number" )
250
- return self .ports [ port_number ].socket
250
+ return self .local_ports [ local_number ].socket
251
251
252
- def error_channel (self , port_number ):
253
- if port_number not in self .ports :
252
+ def error (self , local_number ):
253
+ if local_number not in self .local_ports :
254
254
raise ValueError ("Invalid port number" )
255
- return self .ports [ port_number ].error
255
+ return self .local_ports [ local_number ].error
256
256
257
257
def close (self ):
258
- for port in self .ports .values ():
258
+ for port in self .local_ports .values ():
259
259
port .socket .close ()
260
260
261
261
class _Port :
262
- def __init__ (self , ix , number ):
263
- self .number = number
262
+ def __init__ (self , ix , remote_number ):
263
+ self .remote_number = remote_number
264
264
self .channel = bytes ([ix * 2 ])
265
265
s , self .python = socket .socketpair (socket .AF_UNIX , socket .SOCK_STREAM )
266
266
self .socket = self ._Socket (s )
@@ -287,7 +287,7 @@ def _proxy(self):
287
287
channel_initialized = []
288
288
python_ports = {}
289
289
rlist = []
290
- for port in self .ports .values ():
290
+ for port in self .local_ports .values ():
291
291
# Setup the data channel for this port number
292
292
channel_ports .append (port )
293
293
channel_initialized .append (False )
@@ -300,7 +300,7 @@ def _proxy(self):
300
300
kubernetes_data = b''
301
301
while True :
302
302
wlist = []
303
- for port in self .ports .values ():
303
+ for port in self .local_ports .values ():
304
304
if port .data :
305
305
wlist .append (port .python )
306
306
if kubernetes_data :
@@ -318,7 +318,7 @@ def _proxy(self):
318
318
if s == self .websocket .sock :
319
319
opcode , frame = self .websocket .recv_data_frame (True )
320
320
if opcode == ABNF .OPCODE_CLOSE :
321
- for port in self .ports .values ():
321
+ for port in self .local_ports .values ():
322
322
port .python .close ()
323
323
return
324
324
if opcode == ABNF .OPCODE_BINARY :
@@ -330,11 +330,9 @@ def _proxy(self):
330
330
port = channel_ports [channel ]
331
331
if channel_initialized [channel ]:
332
332
if channel % 2 :
333
- port .error = frame .data [1 :].decode ()
334
- if port .python in rlist :
335
- port .python .close ()
336
- rlist .remove (port .python )
337
- port .data = b''
333
+ if port .error is None :
334
+ port .error = ''
335
+ port .error += frame .data [1 :].decode ()
338
336
else :
339
337
port .data += frame .data [1 :]
340
338
else :
@@ -343,7 +341,7 @@ def _proxy(self):
343
341
"Unexpected initial channel frame data size"
344
342
)
345
343
port_number = frame .data [1 ] + (frame .data [2 ] * 256 )
346
- if port_number != port .number :
344
+ if port_number != port .remote_number :
347
345
raise RuntimeError (
348
346
"Unexpected port number in initial channel frame: " + str (port_number )
349
347
)
@@ -453,17 +451,38 @@ def portforward_call(configuration, _method, url, **kwargs):
453
451
query_params = kwargs .get ("query_params" )
454
452
455
453
ports = []
456
- for key , value in query_params :
457
- if key == 'ports' :
458
- for port in value .split (',' ):
454
+ for ix in range (len (query_params )):
455
+ if query_params [ix ][0 ] == 'ports' :
456
+ remote_ports = []
457
+ for port in query_params [ix ][1 ].split (',' ):
459
458
try :
460
- # The last specified port is the remote port
461
- port = int (port .split (':' )[- 1 ])
462
- if not (0 < port < 65536 ):
459
+ local_remote = port .split (':' )
460
+ if len (local_remote ) > 2 :
463
461
raise ValueError
464
- ports .append (port )
462
+ if len (local_remote ) == 1 :
463
+ local_remote [0 ] = int (local_remote [0 ])
464
+ if not (0 < local_remote [0 ] < 65536 ):
465
+ raise ValueError
466
+ local_remote .append (local_remote [0 ])
467
+ elif len (local_remote ) == 2 :
468
+ if local_remote [0 ]:
469
+ local_remote [0 ] = int (local_remote [0 ])
470
+ if not (0 <= local_remote [0 ] < 65536 ):
471
+ raise ValueError
472
+ else :
473
+ local_remote [0 ] = 0
474
+ local_remote [1 ] = int (local_remote [1 ])
475
+ if not (0 < local_remote [1 ] < 65536 ):
476
+ raise ValueError
477
+ if not local_remote [0 ]:
478
+ local_remote [0 ] = len (ports ) + 1
479
+ else :
480
+ raise ValueError
481
+ ports .append (local_remote )
482
+ remote_ports .append (str (local_remote [1 ]))
465
483
except ValueError :
466
- raise ApiValueError ("Invalid port number `" + str (port ) + "`" )
484
+ raise ApiValueError ("Invalid port number `" + port + "`" )
485
+ query_params [ix ] = ('ports' , ',' .join (remote_ports ))
467
486
if not ports :
468
487
raise ApiValueError ("Missing required parameter `ports`" )
469
488
0 commit comments