Skip to content

Commit 808e708

Browse files
seans3k8s-publishing-bot
authored andcommitted
portforward: tunnel spdy through websockets
Kubernetes-commit: 8b447d8c97e8823b4308eb91cf7d75693e867c61
1 parent df38a01 commit 808e708

File tree

3 files changed

+66
-4
lines changed

3 files changed

+66
-4
lines changed

pkg/util/httpstream/wsstream/conn.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"golang.org/x/net/websocket"
2828

2929
"k8s.io/apimachinery/pkg/util/httpstream"
30+
"k8s.io/apimachinery/pkg/util/portforward"
3031
"k8s.io/apimachinery/pkg/util/remotecommand"
3132
"k8s.io/apimachinery/pkg/util/runtime"
3233
"k8s.io/klog/v2"
@@ -106,6 +107,23 @@ func IsWebSocketRequestWithStreamCloseProtocol(req *http.Request) bool {
106107
return false
107108
}
108109

110+
// IsWebSocketRequestWithTunnelingProtocol returns true if the request contains headers
111+
// identifying that it is requesting a websocket upgrade with a tunneling protocol;
112+
// false otherwise.
113+
func IsWebSocketRequestWithTunnelingProtocol(req *http.Request) bool {
114+
if !IsWebSocketRequest(req) {
115+
return false
116+
}
117+
requestedProtocols := strings.TrimSpace(req.Header.Get(WebSocketProtocolHeader))
118+
for _, requestedProtocol := range strings.Split(requestedProtocols, ",") {
119+
if protocolSupportsWebsocketTunneling(strings.TrimSpace(requestedProtocol)) {
120+
return true
121+
}
122+
}
123+
124+
return false
125+
}
126+
109127
// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
110128
// read and write deadlines are pushed every time a new message is received.
111129
func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
@@ -301,6 +319,12 @@ func protocolSupportsStreamClose(protocol string) bool {
301319
return protocol == remotecommand.StreamProtocolV5Name
302320
}
303321

322+
// protocolSupportsWebsocketTunneling returns true if the passed protocol
323+
// is a tunneled Kubernetes spdy protocol; false otherwise.
324+
func protocolSupportsWebsocketTunneling(protocol string) bool {
325+
return strings.HasPrefix(protocol, portforward.WebsocketsSPDYTunnelingPrefix) && strings.HasSuffix(protocol, portforward.KubernetesSuffix)
326+
}
327+
304328
// handle implements a websocket handler.
305329
func (conn *Conn) handle(ws *websocket.Conn) {
306330
conn.initialize(ws)

pkg/util/portforward/constants.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
Copyright 2016 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package portforward
18+
19+
const (
20+
PortForwardV1Name = "portforward.k8s.io"
21+
WebsocketsSPDYTunnelingPrefix = "SPDY/3.1+"
22+
KubernetesSuffix = ".k8s.io"
23+
WebsocketsSPDYTunnelingPortForwardV1 = WebsocketsSPDYTunnelingPrefix + PortForwardV1Name
24+
)

pkg/util/proxy/upgradeaware.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3737

3838
"github.com/mxk/go-flowrate/flowrate"
39+
3940
"k8s.io/klog/v2"
4041
)
4142

@@ -336,6 +337,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
336337
clone.Host = h.Location.Host
337338
}
338339
clone.URL = &location
340+
klog.V(6).Infof("UpgradeAwareProxy: dialing for SPDY upgrade with headers: %v", clone.Header)
339341
backendConn, err = h.DialForUpgrade(clone)
340342
if err != nil {
341343
klog.V(6).Infof("Proxy connection error: %v", err)
@@ -370,13 +372,13 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
370372
// hijacking should be the last step in the upgrade.
371373
requestHijacker, ok := w.(http.Hijacker)
372374
if !ok {
373-
klog.V(6).Infof("Unable to hijack response writer: %T", w)
375+
klog.Errorf("Unable to hijack response writer: %T", w)
374376
h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
375377
return true
376378
}
377379
requestHijackedConn, _, err := requestHijacker.Hijack()
378380
if err != nil {
379-
klog.V(6).Infof("Unable to hijack response: %v", err)
381+
klog.Errorf("Unable to hijack response: %v", err)
380382
h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
381383
return true
382384
}
@@ -420,7 +422,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
420422
} else {
421423
writer = backendConn
422424
}
423-
_, err := io.Copy(writer, requestHijackedConn)
425+
_, err := io.Copy(writer, &loggingReader{name: "client->backend", delegate: requestHijackedConn})
424426
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
425427
klog.Errorf("Error proxying data from client to backend: %v", err)
426428
}
@@ -434,7 +436,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
434436
} else {
435437
reader = backendConn
436438
}
437-
_, err := io.Copy(requestHijackedConn, reader)
439+
_, err := io.Copy(requestHijackedConn, &loggingReader{name: "backend->client", delegate: reader})
438440
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
439441
klog.Errorf("Error proxying data from backend to client: %v", err)
440442
}
@@ -452,6 +454,18 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
452454
return true
453455
}
454456

457+
// loggingReader logs the bytes read from the "delegate" with a "name" prefix.
458+
type loggingReader struct {
459+
name string
460+
delegate io.Reader
461+
}
462+
463+
func (l *loggingReader) Read(p []byte) (int, error) {
464+
n, err := l.delegate.Read(p)
465+
klog.V(8).Infof("%s: %d bytes, err=%v, bytes=% X", l.name, n, err, p[:n])
466+
return n, err
467+
}
468+
455469
// FIXME: Taken from net/http/httputil/reverseproxy.go as singleJoiningSlash is not exported to be re-used.
456470
// See-also: https://github.com/golang/go/issues/44290
457471
func singleJoiningSlash(a, b string) string {

0 commit comments

Comments
 (0)