Skip to content

Commit e846112

Browse files
committed
Add integration test for basic proxy functionality
- The test covers both grpc and http-connect - The remote server is http based, so it won't send traffic volunteerly unless there is a request - The test traffic is not over TLS
1 parent dfa7cd1 commit e846112

File tree

7 files changed

+338
-41
lines changed

7 files changed

+338
-41
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
11
/bin
2+
/certs/
3+
/cfssl
4+
/cfssljson
5+
/easy-rsa-master/
6+
/easy-rsa.tar.gz
7+

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
.PHONY: gen clean certs build docker/proxy-server docker/proxy-agent push-images
15+
.PHONY: gen clean certs build docker/proxy-server docker/proxy-agent push-images test
1616
proto/agent/agent.pb.go: proto/agent/agent.proto
1717
protoc -I proto proto/agent/agent.proto --go_out=plugins=grpc:proto
1818

@@ -104,3 +104,6 @@ push-images: docker/proxy-agent docker/proxy-server
104104

105105
clean:
106106
rm -rf proto/agent/agent.pb.go proto/proxy.pb.go easy-rsa.tar.gz easy-rsa-master cfssl cfssljson certs bin
107+
108+
test:
109+
go test ./...

pkg/agent/agentclient/client.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,16 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
8181
}
8282

8383
pkt, err := a.stream.Recv()
84-
if err == io.EOF {
85-
klog.Info("received EOF, exit")
86-
return
84+
if err != nil {
85+
if err2, ok := err.(*ReconnectError); ok {
86+
err = err2.Wait()
87+
continue
88+
} else if err == io.EOF {
89+
klog.Info("received EOF, exit")
90+
return
91+
}
8792
}
93+
8894
if err != nil {
8995
klog.Warningf("stream read error: %v", err)
9096
return
@@ -106,7 +112,7 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
106112
conn, err := net.Dial(dialReq.Protocol, dialReq.Address)
107113
if err != nil {
108114
resp.GetDialResponse().Error = err.Error()
109-
if err := a.stream.Send(resp); err != nil {
115+
if err := a.stream.RetrySend(resp); err != nil {
110116
klog.Warningf("stream send error: %v", err)
111117
}
112118
continue
@@ -130,7 +136,7 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
130136
resp.GetCloseResponse().Error = err.Error()
131137
}
132138

133-
if err := a.stream.Send(resp); err != nil {
139+
if err := a.stream.RetrySend(resp); err != nil {
134140
klog.Warningf("close response send error: %v", err)
135141
}
136142

@@ -140,7 +146,7 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
140146
}
141147

142148
resp.GetDialResponse().ConnectID = connID
143-
if err := a.stream.Send(resp); err != nil {
149+
if err := a.stream.RetrySend(resp); err != nil {
144150
klog.Warningf("stream send error: %v", err)
145151
continue
146152
}
@@ -213,7 +219,7 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) {
213219
Data: buf[:n],
214220
ConnectID: connID,
215221
}}
216-
if err := a.stream.Send(resp); err != nil {
222+
if err := a.stream.RetrySend(resp); err != nil {
217223
klog.Warningf("stream send error: %v", err)
218224
}
219225
}

pkg/agent/agentclient/stream.go

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,35 @@ package agentclient
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"sync"
78
"time"
89

910
"google.golang.org/grpc"
1011
"google.golang.org/grpc/connectivity"
1112
"k8s.io/klog"
1213
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
14+
// "k8s.io/client-go/util/retry"
1315
)
1416

1517
const (
1618
defaultRetry = 20
1719
defaultInterval = 5 * time.Second
1820
)
1921

22+
type ReconnectError struct {
23+
internalErr error
24+
errChan <-chan error
25+
}
26+
27+
func (e *ReconnectError) Error() string {
28+
return "transient error: " + e.internalErr.Error()
29+
}
30+
31+
func (e *ReconnectError) Wait() error {
32+
return <-e.errChan
33+
}
34+
2035
type RedialableAgentClient struct {
2136
stream agent.AgentService_ConnectClient
2237

@@ -25,7 +40,7 @@ type RedialableAgentClient struct {
2540
opts []grpc.DialOption
2641
conn *grpc.ClientConn
2742
stopCh chan struct{}
28-
reconnTrigger chan struct{}
43+
reconnTrigger chan error
2944

3045
// locks
3146
sendLock sync.Mutex
@@ -48,7 +63,7 @@ func NewRedialableAgentClient(address string, opts ...grpc.DialOption) *Redialab
4863
stopCh: make(chan struct{}),
4964
}
5065

51-
c.reconnect()
66+
_ = <-c.triggerReconnect()
5267

5368
go c.probe()
5469

@@ -69,7 +84,10 @@ func (c *RedialableAgentClient) probe() {
6984
}
7085
}
7186

72-
c.reconnect()
87+
klog.Info("probe failure: reconnect")
88+
if err := <-c.triggerReconnect(); err != nil {
89+
klog.Infof("probe reconnect failed: %v", err)
90+
}
7391
}
7492
}
7593

@@ -78,46 +96,75 @@ func (c *RedialableAgentClient) Send(pkt *agent.Packet) error {
7896
defer c.sendLock.Unlock()
7997

8098
if err := c.stream.Send(pkt); err != nil {
81-
if c.conn.GetState() == connectivity.Ready {
99+
if err == io.EOF {
82100
return err
83101
}
84-
85-
if err2 := c.reconnect(); err2 != nil {
86-
return err2
102+
return &ReconnectError{
103+
internalErr: err,
104+
errChan: c.triggerReconnect(),
87105
}
88106
}
89107

90-
return c.stream.Send(pkt)
108+
return nil
91109
}
92110

93-
func (c *RedialableAgentClient) Recv() (*agent.Packet, error) {
94-
c.recvLock.Lock()
95-
defer c.recvLock.Unlock()
111+
func (c *RedialableAgentClient) RetrySend(pkt *agent.Packet) error {
112+
err := c.Send(pkt)
113+
if err == nil {
114+
return nil
115+
} else if err == io.EOF {
116+
return err
117+
}
96118

97-
// this just get block..
98-
if pkt, err := c.stream.Recv(); err != nil {
99-
klog.Infof("error recving: %v", err)
100-
klog.Info("start reconnecting")
119+
if err2, ok := err.(*ReconnectError); ok {
120+
err = err2.Wait()
121+
}
122+
if err != nil {
123+
return err
124+
}
125+
return c.RetrySend(pkt)
126+
}
101127

102-
if err2 := c.reconnect(); err2 != nil {
103-
klog.Infof("reconnect failed: %v", err2)
104-
return pkt, err2
105-
}
128+
func (c *RedialableAgentClient) triggerReconnect() <-chan error {
129+
c.reconnLock.Lock()
130+
defer c.reconnLock.Unlock()
106131

107-
return c.stream.Recv()
108-
} else {
109-
return pkt, err
132+
if c.reconnTrigger != nil {
133+
return c.reconnTrigger
110134
}
135+
136+
c.reconnTrigger = make(chan error)
137+
go c.reconnect()
138+
139+
return c.reconnTrigger
111140
}
112141

113-
func (c *RedialableAgentClient) reconnect() error {
142+
func (c *RedialableAgentClient) doneReconnect(err error) {
114143
c.reconnLock.Lock()
115144
defer c.reconnLock.Unlock()
116145

117-
if c.conn != nil && c.conn.GetState() == connectivity.Ready {
118-
return nil
146+
c.reconnTrigger <- err
147+
c.reconnTrigger = nil
148+
}
149+
150+
func (c *RedialableAgentClient) Recv() (*agent.Packet, error) {
151+
c.recvLock.Lock()
152+
defer c.recvLock.Unlock()
153+
154+
if pkt, err := c.stream.Recv(); err != nil {
155+
if err == io.EOF {
156+
return pkt, err
157+
}
158+
return pkt, &ReconnectError{
159+
internalErr: err,
160+
errChan: c.triggerReconnect(),
161+
}
162+
} else {
163+
return pkt, nil
119164
}
165+
}
120166

167+
func (c *RedialableAgentClient) reconnect() {
121168
klog.Info("start to connect...")
122169

123170
var err error
@@ -126,14 +173,16 @@ func (c *RedialableAgentClient) reconnect() error {
126173
for retry < c.Retry {
127174
if err = c.tryConnect(); err == nil {
128175
klog.Info("connected")
129-
return nil
176+
c.doneReconnect(nil)
177+
return
130178
}
131179
retry++
132180
klog.V(5).Infof("Failed to connect to proxy server, retry %d in %v: %v", retry, c.Interval, err)
181+
klog.Infof("Failed to connect to proxy server, retry %d in %v: %v", retry, c.Interval, err)
133182
time.Sleep(c.Interval)
134183
}
135184

136-
return fmt.Errorf("Failed to connect to proxy server: %v", err)
185+
c.doneReconnect(fmt.Errorf("Failed to connect to proxy server: %v", err))
137186
}
138187

139188
func (c *RedialableAgentClient) tryConnect() error {

pkg/agent/agentserver/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (c *ProxyClientConnection) send(pkt *agent.Packet) error {
5454
}
5555
}
5656

57-
// ProxyServer ...
57+
// ProxyServer
5858
type ProxyServer struct {
5959
Backend agent.AgentService_ConnectServer
6060

@@ -67,7 +67,7 @@ var _ agent.AgentServiceServer = &ProxyServer{}
6767

6868
var _ agent.ProxyServiceServer = &ProxyServer{}
6969

70-
// NewProxyServer ...
70+
// NewProxyServer creates a new ProxyServer instance
7171
func NewProxyServer() *ProxyServer {
7272
return &ProxyServer{
7373
Frontends: make(map[int64]*ProxyClientConnection),

pkg/agent/agentserver/tunnel.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ type Tunnel struct {
3232
}
3333

3434
func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
35-
klog.Infof("Received %s request to %q from %v",
36-
r.Method,
37-
r.Host,
38-
r.TLS.PeerCertificates[0].Subject.CommonName) // can do authz with certs
35+
klog.Infof("Received %s request to %q", r.Method, r.Host)
36+
if r.TLS != nil {
37+
klog.Infof("TLS CommonName: %v", r.TLS.PeerCertificates[0].Subject.CommonName)
38+
}
3939
if r.Method != http.MethodConnect {
4040
http.Error(w, "this proxy only supports CONNECT passthrough", http.StatusMethodNotAllowed)
4141
return

0 commit comments

Comments
 (0)