Skip to content

Commit ae0fa93

Browse files
committed
agentclient handles multiple waiters for reconnect
When reconnect happens, if there are multiple waiters, all of them should get notification when the reconnect is done. AgentClient differentiates between first connect with reconnect. If the agent fails to connect to agent server, it is considered to be critical, and we don't retry. After connection is established, we retry connecting to server to recover from server failure.
1 parent e846112 commit ae0fa93

File tree

6 files changed

+187
-28
lines changed

6 files changed

+187
-28
lines changed

cmd/agent/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ func (p *Agent) runProxyConnection(o *GrpcProxyAgentOptions) error {
178178
RootCAs: certPool,
179179
})
180180
dialOption := grpc.WithTransportCredentials(transportCreds)
181-
client := agentclient.NewAgentClient(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), dialOption)
181+
client, err := agentclient.NewAgentClient(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), dialOption)
182+
if err != nil {
183+
return err
184+
}
182185

183186
stopCh := make(chan struct{})
184187

pkg/agent/agentclient/client.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,17 @@ type AgentClient struct {
3737
}
3838

3939
// NewAgentClient creates an AgentClient
40-
func NewAgentClient(address string, opts ...grpc.DialOption) *AgentClient {
40+
func NewAgentClient(address string, opts ...grpc.DialOption) (*AgentClient, error) {
41+
stream, err := NewRedialableAgentClient(address, opts...)
42+
if err != nil {
43+
return nil, err
44+
}
45+
4146
a := &AgentClient{
4247
connContext: make(map[int64]*connContext),
43-
stream: NewRedialableAgentClient(address, opts...),
48+
stream: stream,
4449
}
45-
46-
return a
50+
return a, nil
4751
}
4852

4953
// connContext tracks a connection from agent to node network.

pkg/agent/agentclient/client_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import (
1616

1717
func TestServeData_HTTP(t *testing.T) {
1818
var stream agent.AgentService_ConnectClient
19-
client := NewAgentClient("")
20-
client.stream, stream = pipe()
19+
client := &AgentClient{
20+
connContext: make(map[int64]*connContext),
21+
}
22+
client.stream, stream = pipe2()
2123
stopCh := make(chan struct{})
2224

2325
// Start agent
@@ -100,8 +102,10 @@ func TestServeData_HTTP(t *testing.T) {
100102

101103
func TestClose_Client(t *testing.T) {
102104
var stream agent.AgentService_ConnectClient
103-
client := NewAgentClient("")
104-
client.stream, stream = pipe()
105+
client := &AgentClient{
106+
connContext: make(map[int64]*connContext),
107+
}
108+
client.stream, stream = pipe2()
105109
stopCh := make(chan struct{})
106110

107111
// Start agent
@@ -193,6 +197,11 @@ func pipe() (agent.AgentService_ConnectClient, agent.AgentService_ConnectClient)
193197
return s1, s2
194198
}
195199

200+
func pipe2() (*RedialableAgentClient, agent.AgentService_ConnectClient) {
201+
s1, s2 := pipe()
202+
return &RedialableAgentClient{stream: s1}, s2
203+
}
204+
196205
func (s *fakeStream) Send(packet *agent.Packet) error {
197206
klog.Infof("[DEBUG] send packet %+v", packet)
198207
s.w <- packet

pkg/agent/agentclient/stream.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ type RedialableAgentClient struct {
4040
opts []grpc.DialOption
4141
conn *grpc.ClientConn
4242
stopCh chan struct{}
43-
reconnTrigger chan error
43+
reconnOngoing bool
44+
reconnWaiters []chan error
4445

4546
// locks
4647
sendLock sync.Mutex
@@ -54,7 +55,7 @@ type RedialableAgentClient struct {
5455
Interval time.Duration
5556
}
5657

57-
func NewRedialableAgentClient(address string, opts ...grpc.DialOption) *RedialableAgentClient {
58+
func NewRedialableAgentClient(address string, opts ...grpc.DialOption) (*RedialableAgentClient, error) {
5859
c := &RedialableAgentClient{
5960
address: address,
6061
opts: opts,
@@ -63,11 +64,7 @@ func NewRedialableAgentClient(address string, opts ...grpc.DialOption) *Redialab
6364
stopCh: make(chan struct{}),
6465
}
6566

66-
_ = <-c.triggerReconnect()
67-
68-
go c.probe()
69-
70-
return c
67+
return c, c.Connect()
7168
}
7269

7370
func (c *RedialableAgentClient) probe() {
@@ -129,22 +126,26 @@ func (c *RedialableAgentClient) triggerReconnect() <-chan error {
129126
c.reconnLock.Lock()
130127
defer c.reconnLock.Unlock()
131128

132-
if c.reconnTrigger != nil {
133-
return c.reconnTrigger
134-
}
129+
errch := make(chan error)
130+
c.reconnWaiters = append(c.reconnWaiters, errch)
135131

136-
c.reconnTrigger = make(chan error)
137-
go c.reconnect()
132+
if !c.reconnOngoing {
133+
go c.reconnect()
134+
c.reconnOngoing = true
135+
}
138136

139-
return c.reconnTrigger
137+
return errch
140138
}
141139

142140
func (c *RedialableAgentClient) doneReconnect(err error) {
143141
c.reconnLock.Lock()
144142
defer c.reconnLock.Unlock()
145143

146-
c.reconnTrigger <- err
147-
c.reconnTrigger = nil
144+
for _, ch := range c.reconnWaiters {
145+
ch <- err
146+
}
147+
c.reconnOngoing = false
148+
c.reconnWaiters = nil
148149
}
149150

150151
func (c *RedialableAgentClient) Recv() (*agent.Packet, error) {
@@ -164,6 +165,15 @@ func (c *RedialableAgentClient) Recv() (*agent.Packet, error) {
164165
}
165166
}
166167

168+
func (c *RedialableAgentClient) Connect() error {
169+
if err := c.tryConnect(); err != nil {
170+
return err
171+
}
172+
173+
go c.probe()
174+
return nil
175+
}
176+
167177
func (c *RedialableAgentClient) reconnect() {
168178
klog.Info("start to connect...")
169179

pkg/agent/agentclient/stream_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package agentclient
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net"
7+
"testing"
8+
"time"
9+
10+
"google.golang.org/grpc"
11+
"k8s.io/klog"
12+
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
13+
)
14+
15+
func TestReconnectExits(t *testing.T) {
16+
server := newTestServer("localhost:8899") // random addr
17+
server.Start()
18+
defer server.Stop()
19+
20+
time.Sleep(time.Millisecond)
21+
22+
client, err := NewRedialableAgentClient("localhost:8899", grpc.WithInsecure())
23+
if err != nil {
24+
t.Fatal(err)
25+
}
26+
27+
err = client.Send(&agent.Packet{
28+
Type: agent.PacketType_DIAL_REQ,
29+
})
30+
if err != nil {
31+
t.Error(err)
32+
}
33+
34+
client1 := make(chan bool)
35+
go func() {
36+
_, err := client.Recv()
37+
if err != nil {
38+
if err2, ok := err.(*ReconnectError); ok {
39+
err2.Wait()
40+
client1 <- true
41+
}
42+
}
43+
}()
44+
45+
client2 := make(chan bool)
46+
go func() {
47+
_, err := client.Recv()
48+
if err != nil {
49+
if err2, ok := err.(*ReconnectError); ok {
50+
err2.Wait()
51+
client2 <- true
52+
}
53+
}
54+
}()
55+
56+
client.interrupt()
57+
58+
var got1 bool
59+
var got2 bool
60+
select {
61+
case got1 = <-client1:
62+
case <-time.After(time.Second):
63+
}
64+
select {
65+
case got2 = <-client2:
66+
case <-time.After(time.Second):
67+
}
68+
69+
if !got1 || !got2 {
70+
t.Errorf("expect both clients get unblocked; not they don't (%t %t)", got1, got2)
71+
}
72+
}
73+
74+
type testServer struct {
75+
addr string
76+
grpcServer *grpc.Server
77+
}
78+
79+
func newTestServer(addr string) *testServer {
80+
return &testServer{addr: addr}
81+
}
82+
83+
func (s *testServer) Connect(stream agent.AgentService_ConnectServer) error {
84+
stopCh := make(chan error)
85+
86+
// Recv only
87+
go func() {
88+
for {
89+
_, err := stream.Recv()
90+
if err == io.EOF {
91+
close(stopCh)
92+
return
93+
}
94+
if err != nil {
95+
klog.Warningf(">>> Stream read from frontend error: %v", err)
96+
close(stopCh)
97+
return
98+
}
99+
}
100+
}()
101+
102+
return <-stopCh
103+
}
104+
105+
func (s *testServer) Start() error {
106+
s.grpcServer = grpc.NewServer()
107+
agent.RegisterAgentServiceServer(s.grpcServer, s)
108+
lis, err := net.Listen("tcp", s.addr)
109+
if err != nil {
110+
return fmt.Errorf("failed to listen on %s: %v", s.addr, err)
111+
}
112+
go s.grpcServer.Serve(lis)
113+
return nil
114+
}
115+
116+
func (s *testServer) Stop() {
117+
if s.grpcServer != nil {
118+
s.grpcServer.Stop()
119+
}
120+
}
121+
122+
func (s *testServer) Addr() string {
123+
return s.addr
124+
}

tests/proxy_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ func TestBasicProxy_GRPC(t *testing.T) {
3737
proxy, cleanup, err := runGRPCProxyServer()
3838
defer cleanup()
3939

40-
runAgent(proxy.agent, stopCh)
40+
if err := runAgent(proxy.agent, stopCh); err != nil {
41+
t.Fatal(err)
42+
}
4143

4244
// Wait for agent to register on proxy server
4345
time.Sleep(time.Second)
@@ -79,7 +81,9 @@ func TestBasicProxy_HTTPCONN(t *testing.T) {
7981
proxy, cleanup, err := runHTTPConnProxyServer()
8082
defer cleanup()
8183

82-
runAgent(proxy.agent, stopCh)
84+
if err := runAgent(proxy.agent, stopCh); err != nil {
85+
t.Fatal(err)
86+
}
8387

8488
// Wait for agent to register on proxy server
8589
time.Sleep(time.Second)
@@ -226,8 +230,13 @@ func runHTTPConnProxyServer() (proxy, func(), error) {
226230
return proxy, cleanup, nil
227231
}
228232

229-
func runAgent(addr string, stopCh <-chan struct{}) {
230-
client := agentclient.NewAgentClient(addr, grpc.WithInsecure())
233+
func runAgent(addr string, stopCh <-chan struct{}) error {
234+
client, err := agentclient.NewAgentClient(addr, grpc.WithInsecure())
235+
if err != nil {
236+
return err
237+
}
231238

232239
go client.Serve(stopCh)
240+
241+
return nil
233242
}

0 commit comments

Comments
 (0)