Skip to content

Commit 5fb9353

Browse files
authored
Merge pull request #2985 from balajiv113/grpc-pw
Revamp GRPC Port forwarding tunnels to use existing proxy
2 parents 5148ffd + 17b2d58 commit 5fb9353

File tree

2 files changed

+117
-131
lines changed

2 files changed

+117
-131
lines changed

pkg/portfwd/client.go

+62-89
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,18 @@ package portfwd
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"io"
86
"net"
7+
"time"
98

9+
"github.com/containers/gvisor-tap-vsock/pkg/services/forwarder"
10+
"github.com/lima-vm/lima/pkg/bicopy"
1011
"github.com/lima-vm/lima/pkg/guestagent/api"
1112
guestagentclient "github.com/lima-vm/lima/pkg/guestagent/api/client"
1213
"github.com/sirupsen/logrus"
13-
"golang.org/x/sync/errgroup"
1414
)
1515

1616
func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.Conn, guestAddr string) {
17-
defer conn.Close()
18-
1917
id := fmt.Sprintf("tcp-%s-%s", conn.LocalAddr().String(), conn.RemoteAddr().String())
2018

2119
stream, err := client.Tunnel(ctx)
@@ -24,26 +22,17 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen
2422
return
2523
}
2624

27-
g, _ := errgroup.WithContext(ctx)
28-
29-
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr}
30-
g.Go(func() error {
31-
_, err := io.Copy(rw, conn)
32-
return err
33-
})
34-
g.Go(func() error {
35-
_, err := io.Copy(conn, rw)
36-
return err
37-
})
38-
39-
if err := g.Wait(); err != nil {
40-
logrus.Debugf("error in tcp tunnel for id: %s error:%v", id, err)
25+
// Handshake message to start tunnel
26+
if err := stream.Send(&api.TunnelMessage{Id: id, Protocol: "tcp", GuestAddr: guestAddr}); err != nil {
27+
logrus.Errorf("could not start tcp tunnel for id: %s error:%v", id, err)
28+
return
4129
}
30+
31+
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr, protocol: "tcp"}
32+
bicopy.Bicopy(rw, conn, nil)
4233
}
4334

4435
func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.PacketConn, guestAddr string) {
45-
defer conn.Close()
46-
4736
id := fmt.Sprintf("udp-%s", conn.LocalAddr().String())
4837

4938
stream, err := client.Tunnel(ctx)
@@ -52,98 +41,82 @@ func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgen
5241
return
5342
}
5443

55-
g, _ := errgroup.WithContext(ctx)
56-
57-
g.Go(func() error {
58-
buf := make([]byte, 65507)
59-
for {
60-
n, addr, err := conn.ReadFrom(buf)
61-
// We must handle n > 0 bytes before considering the error.
62-
// https://pkg.go.dev/net#PacketConn
63-
if n > 0 {
64-
msg := &api.TunnelMessage{
65-
Id: id + "-" + addr.String(),
66-
Protocol: "udp",
67-
GuestAddr: guestAddr,
68-
Data: buf[:n],
69-
UdpTargetAddr: addr.String(),
70-
}
71-
if err := stream.Send(msg); err != nil {
72-
return err
73-
}
74-
}
75-
if err != nil {
76-
// https://pkg.go.dev/net#PacketConn does not mention io.EOF semantics.
77-
if errors.Is(err, io.EOF) {
78-
return nil
79-
}
80-
return err
81-
}
82-
}
83-
})
44+
// Handshake message to start tunnel
45+
if err := stream.Send(&api.TunnelMessage{Id: id, Protocol: "udp", GuestAddr: guestAddr}); err != nil {
46+
logrus.Errorf("could not start udp tunnel for id: %s error:%v", id, err)
47+
return
48+
}
8449

85-
g.Go(func() error {
86-
for {
87-
// Not documented: when err != nil, in is always nil.
88-
in, err := stream.Recv()
89-
if err != nil {
90-
if errors.Is(err, io.EOF) {
91-
return nil
92-
}
93-
return err
94-
}
95-
addr, err := net.ResolveUDPAddr("udp", in.UdpTargetAddr)
96-
if err != nil {
97-
return err
98-
}
99-
_, err = conn.WriteTo(in.Data, addr)
100-
if err != nil {
101-
return err
102-
}
103-
}
50+
proxy, err := forwarder.NewUDPProxy(conn, func() (net.Conn, error) {
51+
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr, protocol: "udp"}
52+
return rw, nil
10453
})
105-
106-
if err := g.Wait(); err != nil {
107-
logrus.Debugf("error in udp tunnel for id: %s error:%v", id, err)
54+
if err != nil {
55+
logrus.Errorf("error in udp tunnel proxy for id: %s error:%v", id, err)
56+
return
10857
}
58+
59+
defer func() {
60+
err := proxy.Close()
61+
if err != nil {
62+
logrus.Errorf("error in closing udp tunnel proxy for id: %s error:%v", id, err)
63+
}
64+
}()
65+
proxy.Run()
10966
}
11067

11168
type GrpcClientRW struct {
112-
id string
113-
addr string
114-
stream api.GuestService_TunnelClient
69+
id string
70+
addr string
71+
72+
protocol string
73+
stream api.GuestService_TunnelClient
11574
}
11675

117-
var _ io.ReadWriter = (*GrpcClientRW)(nil)
76+
var _ net.Conn = (*GrpcClientRW)(nil)
11877

119-
func (g GrpcClientRW) Write(p []byte) (n int, err error) {
120-
if len(p) == 0 {
121-
return 0, nil
122-
}
78+
func (g *GrpcClientRW) Write(p []byte) (n int, err error) {
12379
err = g.stream.Send(&api.TunnelMessage{
12480
Id: g.id,
12581
GuestAddr: g.addr,
12682
Data: p,
127-
Protocol: "tcp",
83+
Protocol: g.protocol,
12884
})
12985
if err != nil {
13086
return 0, err
13187
}
13288
return len(p), nil
13389
}
13490

135-
func (g GrpcClientRW) Read(p []byte) (n int, err error) {
136-
// Not documented: when err != nil, in is always nil.
91+
func (g *GrpcClientRW) Read(p []byte) (n int, err error) {
13792
in, err := g.stream.Recv()
13893
if err != nil {
139-
if errors.Is(err, io.EOF) {
140-
return 0, nil
141-
}
14294
return 0, err
14395
}
144-
if len(in.Data) == 0 {
145-
return 0, nil
146-
}
14796
copy(p, in.Data)
14897
return len(in.Data), nil
14998
}
99+
100+
func (g *GrpcClientRW) Close() error {
101+
return g.stream.CloseSend()
102+
}
103+
104+
func (g *GrpcClientRW) LocalAddr() net.Addr {
105+
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
106+
}
107+
108+
func (g *GrpcClientRW) RemoteAddr() net.Addr {
109+
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
110+
}
111+
112+
func (g *GrpcClientRW) SetDeadline(_ time.Time) error {
113+
return nil
114+
}
115+
116+
func (g *GrpcClientRW) SetReadDeadline(_ time.Time) error {
117+
return nil
118+
}
119+
120+
func (g *GrpcClientRW) SetWriteDeadline(_ time.Time) error {
121+
return nil
122+
}

pkg/portfwdserver/server.go

+55-42
Original file line numberDiff line numberDiff line change
@@ -4,66 +4,79 @@ import (
44
"errors"
55
"io"
66
"net"
7+
"time"
78

9+
"github.com/lima-vm/lima/pkg/bicopy"
810
"github.com/lima-vm/lima/pkg/guestagent/api"
911
)
1012

11-
type TunnelServer struct {
12-
Conns map[string]net.Conn
13-
}
13+
type TunnelServer struct{}
1414

1515
func NewTunnelServer() *TunnelServer {
16-
return &TunnelServer{
17-
Conns: make(map[string]net.Conn),
18-
}
16+
return &TunnelServer{}
1917
}
2018

2119
func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error {
22-
for {
23-
in, err := stream.Recv()
20+
// Receive the handshake message to start tunnel
21+
in, err := stream.Recv()
22+
if err != nil {
2423
if errors.Is(err, io.EOF) {
2524
return nil
2625
}
27-
if err != nil {
28-
return err
29-
}
30-
if len(in.Data) == 0 {
31-
continue
32-
}
26+
return err
27+
}
3328

34-
conn, ok := s.Conns[in.Id]
35-
if !ok {
36-
conn, err = net.Dial(in.Protocol, in.GuestAddr)
37-
if err != nil {
38-
return err
39-
}
40-
s.Conns[in.Id] = conn
41-
42-
writer := &GRPCServerWriter{id: in.Id, udpAddr: in.UdpTargetAddr, stream: stream}
43-
go func() {
44-
_, _ = io.Copy(writer, conn)
45-
delete(s.Conns, writer.id)
46-
}()
47-
}
48-
_, err = conn.Write(in.Data)
49-
if err != nil {
50-
return err
51-
}
29+
// We simply forward data form GRPC stream to net.Conn for both tcp and udp. So simple proxy is sufficient
30+
conn, err := net.Dial(in.Protocol, in.GuestAddr)
31+
if err != nil {
32+
return err
5233
}
34+
rw := &GRPCServerRW{stream: stream, id: in.Id}
35+
bicopy.Bicopy(rw, conn, nil)
36+
return nil
5337
}
5438

55-
type GRPCServerWriter struct {
56-
id string
57-
udpAddr string
58-
stream api.GuestService_TunnelServer
39+
type GRPCServerRW struct {
40+
id string
41+
stream api.GuestService_TunnelServer
5942
}
6043

61-
var _ io.Writer = (*GRPCServerWriter)(nil)
44+
var _ net.Conn = (*GRPCServerRW)(nil)
6245

63-
func (g GRPCServerWriter) Write(p []byte) (n int, err error) {
64-
if len(p) == 0 {
65-
return 0, nil
66-
}
67-
err = g.stream.Send(&api.TunnelMessage{Id: g.id, Data: p, UdpTargetAddr: g.udpAddr})
46+
func (g *GRPCServerRW) Write(p []byte) (n int, err error) {
47+
err = g.stream.Send(&api.TunnelMessage{Id: g.id, Data: p})
6848
return len(p), err
6949
}
50+
51+
func (g *GRPCServerRW) Read(p []byte) (n int, err error) {
52+
in, err := g.stream.Recv()
53+
if err != nil {
54+
return 0, err
55+
}
56+
copy(p, in.Data)
57+
return len(in.Data), nil
58+
}
59+
60+
func (g *GRPCServerRW) Close() error {
61+
return nil
62+
}
63+
64+
func (g *GRPCServerRW) LocalAddr() net.Addr {
65+
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
66+
}
67+
68+
func (g *GRPCServerRW) RemoteAddr() net.Addr {
69+
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
70+
}
71+
72+
func (g *GRPCServerRW) SetDeadline(_ time.Time) error {
73+
return nil
74+
}
75+
76+
func (g *GRPCServerRW) SetReadDeadline(_ time.Time) error {
77+
return nil
78+
}
79+
80+
func (g *GRPCServerRW) SetWriteDeadline(_ time.Time) error {
81+
return nil
82+
}

0 commit comments

Comments
 (0)