Skip to content

Commit 72862c0

Browse files
authored
Merge pull request #31 from anfernee/agent
Agent supports reconnect
2 parents 8ebdad5 + ae0fa93 commit 72862c0

File tree

10 files changed

+637
-48
lines changed

10 files changed

+637
-48
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
11
/bin
2+
/certs/
3+
/cfssl
4+
/cfssljson
5+
/easy-rsa-master/
6+
/easy-rsa.tar.gz
7+

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
.PHONY: gen clean certs build docker/proxy-server docker/proxy-agent push-images
15+
.PHONY: gen clean certs build docker/proxy-server docker/proxy-agent push-images test
1616
proto/agent/agent.pb.go: proto/agent/agent.proto
1717
protoc -I proto proto/agent/agent.proto --go_out=plugins=grpc:proto
1818

@@ -104,3 +104,6 @@ push-images: docker/proxy-agent docker/proxy-server
104104

105105
clean:
106106
rm -rf proto/agent/agent.pb.go proto/proxy.pb.go easy-rsa.tar.gz easy-rsa-master cfssl cfssljson certs bin
107+
108+
test:
109+
go test ./...

cmd/agent/main.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ import (
3030
"github.com/spf13/pflag"
3131
"google.golang.org/grpc"
3232
"google.golang.org/grpc/credentials"
33+
"k8s.io/klog"
3334
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/agentclient"
3435
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
35-
"k8s.io/klog"
3636
)
3737

3838
func main() {
@@ -113,9 +113,9 @@ func (o *GrpcProxyAgentOptions) Validate() error {
113113

114114
func newGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
115115
o := GrpcProxyAgentOptions{
116-
agentCert: "",
117-
agentKey: "",
118-
caCert: "",
116+
agentCert: "",
117+
agentKey: "",
118+
caCert: "",
119119
proxyServerHost: "127.0.0.1",
120120
proxyServerPort: 8091,
121121
}
@@ -178,10 +178,9 @@ func (p *Agent) runProxyConnection(o *GrpcProxyAgentOptions) error {
178178
RootCAs: certPool,
179179
})
180180
dialOption := grpc.WithTransportCredentials(transportCreds)
181-
client := agentclient.NewAgentClient(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort))
182-
183-
if err := client.Connect(dialOption); err != nil {
184-
return fmt.Errorf("failed to connect to proxy-server: %v", err)
181+
client, err := agentclient.NewAgentClient(fmt.Sprintf("%s:%d", o.proxyServerHost, o.proxyServerPort), dialOption)
182+
if err != nil {
183+
return err
185184
}
186185

187186
stopCh := make(chan struct{})

pkg/agent/agentclient/client.go

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package agentclient
1818

1919
import (
20-
"context"
2120
"io"
2221
"net"
2322
"sync"
@@ -33,19 +32,22 @@ import (
3332
type AgentClient struct {
3433
nextConnID int64
3534
connContext map[int64]*connContext
36-
address string
3735

38-
stream agent.AgentService_ConnectClient
36+
stream *RedialableAgentClient
3937
}
4038

4139
// NewAgentClient creates an AgentClient
42-
func NewAgentClient(address string) *AgentClient {
40+
func NewAgentClient(address string, opts ...grpc.DialOption) (*AgentClient, error) {
41+
stream, err := NewRedialableAgentClient(address, opts...)
42+
if err != nil {
43+
return nil, err
44+
}
45+
4346
a := &AgentClient{
4447
connContext: make(map[int64]*connContext),
45-
address: address,
48+
stream: stream,
4649
}
47-
48-
return a
50+
return a, nil
4951
}
5052

5153
// connContext tracks a connection from agent to node network.
@@ -68,21 +70,6 @@ func (c *connContext) cleanup() {
6870
//
6971
// The caller needs to call Serve to start serving proxy requests
7072
// coming from proxy server.
71-
func (a *AgentClient) Connect(opts ...grpc.DialOption) error {
72-
c, err := grpc.Dial(a.address, opts...)
73-
if err != nil {
74-
return err
75-
}
76-
77-
client := agent.NewAgentServiceClient(c)
78-
79-
a.stream, err = client.Connect(context.Background())
80-
if err != nil {
81-
return err
82-
}
83-
84-
return nil
85-
}
8673

8774
// Serve starts to serve proxied requests from proxy server over the
8875
// gRPC stream. Successful Connect is required before Serve. The
@@ -98,10 +85,16 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
9885
}
9986

10087
pkt, err := a.stream.Recv()
101-
if err == io.EOF {
102-
klog.Info("received EOF, exit")
103-
return
88+
if err != nil {
89+
if err2, ok := err.(*ReconnectError); ok {
90+
err = err2.Wait()
91+
continue
92+
} else if err == io.EOF {
93+
klog.Info("received EOF, exit")
94+
return
95+
}
10496
}
97+
10598
if err != nil {
10699
klog.Warningf("stream read error: %v", err)
107100
return
@@ -123,7 +116,7 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
123116
conn, err := net.Dial(dialReq.Protocol, dialReq.Address)
124117
if err != nil {
125118
resp.GetDialResponse().Error = err.Error()
126-
if err := a.stream.Send(resp); err != nil {
119+
if err := a.stream.RetrySend(resp); err != nil {
127120
klog.Warningf("stream send error: %v", err)
128121
}
129122
continue
@@ -147,7 +140,7 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
147140
resp.GetCloseResponse().Error = err.Error()
148141
}
149142

150-
if err := a.stream.Send(resp); err != nil {
143+
if err := a.stream.RetrySend(resp); err != nil {
151144
klog.Warningf("close response send error: %v", err)
152145
}
153146

@@ -157,7 +150,7 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
157150
}
158151

159152
resp.GetDialResponse().ConnectID = connID
160-
if err := a.stream.Send(resp); err != nil {
153+
if err := a.stream.RetrySend(resp); err != nil {
161154
klog.Warningf("stream send error: %v", err)
162155
continue
163156
}
@@ -230,7 +223,7 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) {
230223
Data: buf[:n],
231224
ConnectID: connID,
232225
}}
233-
if err := a.stream.Send(resp); err != nil {
226+
if err := a.stream.RetrySend(resp); err != nil {
234227
klog.Warningf("stream send error: %v", err)
235228
}
236229
}

pkg/agent/agentclient/client_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import (
1616

1717
func TestServeData_HTTP(t *testing.T) {
1818
var stream agent.AgentService_ConnectClient
19-
client := NewAgentClient("")
20-
client.stream, stream = pipe()
19+
client := &AgentClient{
20+
connContext: make(map[int64]*connContext),
21+
}
22+
client.stream, stream = pipe2()
2123
stopCh := make(chan struct{})
2224

2325
// Start agent
@@ -100,8 +102,10 @@ func TestServeData_HTTP(t *testing.T) {
100102

101103
func TestClose_Client(t *testing.T) {
102104
var stream agent.AgentService_ConnectClient
103-
client := NewAgentClient("")
104-
client.stream, stream = pipe()
105+
client := &AgentClient{
106+
connContext: make(map[int64]*connContext),
107+
}
108+
client.stream, stream = pipe2()
105109
stopCh := make(chan struct{})
106110

107111
// Start agent
@@ -193,6 +197,11 @@ func pipe() (agent.AgentService_ConnectClient, agent.AgentService_ConnectClient)
193197
return s1, s2
194198
}
195199

200+
func pipe2() (*RedialableAgentClient, agent.AgentService_ConnectClient) {
201+
s1, s2 := pipe()
202+
return &RedialableAgentClient{stream: s1}, s2
203+
}
204+
196205
func (s *fakeStream) Send(packet *agent.Packet) error {
197206
klog.Infof("[DEBUG] send packet %+v", packet)
198207
s.w <- packet

0 commit comments

Comments
 (0)