Skip to content

Commit 2fe19c3

Browse files
kate-osbornKate Osborn
authored and
Kate Osborn
committed
Add skeleton for gRPC Commander Service (#408)
1 parent 065b611 commit 2fe19c3

File tree

9 files changed

+344
-9
lines changed

9 files changed

+344
-9
lines changed

build/nginx-with-agent/nginx-agent.conf

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ dataplane:
2828
metrics:
2929
# specify the size of a buffer to build before sending metrics
3030
bulk_size: 20
31-
# specify metrics poll interval
32-
report_interval: 1m
31+
# setting the report interval to 1 year because our control plane doesn't implement the metrics server
32+
# so the agent spams the logs with error messages and retries. Ideally, we'll be able to disable the metrics
33+
# client altogether in the future.
34+
report_interval: 8760h
3335
collection_interval: 15s
3436
mode: aggregated
3537

@@ -40,3 +42,12 @@ config_dirs: "/etc/nginx"
4042
api:
4143
# default port for Agent API, this is for the server configuration of the REST API
4244
port: 8081
45+
46+
server:
47+
host: 127.0.0.1
48+
grpcPort: 54789
49+
50+
# TLS is temporarily disabled. Once we fully separate the data plane from the control plane TLS will be enabled.
51+
tls:
52+
enable: false
53+
skip_verify: true

build/nginx-with-agent/nginx-with-agent.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ spec:
2020
imagePullPolicy: IfNotPresent
2121
name: nginx-with-agent
2222
securityContext:
23-
allowPrivilegeEscalation: true
2423
runAsNonRoot: true
2524
runAsUser: 101 #nginx
2625
capabilities:

deploy/manifests/nginx-gateway.yaml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ spec:
116116
args:
117117
- --gateway-ctlr-name=k8s-gateway.nginx.org/nginx-gateway-controller
118118
- --gatewayclass=nginx
119-
- image: nginx:1.23
119+
- image: nginx:1.23 # I will remove the nginx container once the control plane can push config to nginx-with-agent.
120120
imagePullPolicy: IfNotPresent
121121
name: nginx
122122
ports:
@@ -131,3 +131,17 @@ spec:
131131
mountPath: /var/lib/nginx
132132
- name: njs-modules
133133
mountPath: /usr/lib/nginx/modules/njs
134+
- name: nginx-with-agent
135+
image: docker.io/nginx-kubernetes-gateway/nginx-with-agent:edge
136+
imagePullPolicy: IfNotPresent
137+
securityContext:
138+
runAsNonRoot: true
139+
runAsUser: 101 #nginx
140+
capabilities:
141+
drop:
142+
- ALL
143+
ports:
144+
- name: http
145+
containerPort: 8080
146+
- name: https
147+
containerPort: 8443

go.mod

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ require (
66
github.com/go-logr/logr v1.2.3
77
github.com/google/go-cmp v0.5.9
88
github.com/maxbrunsfeld/counterfeiter/v6 v6.6.1
9+
github.com/nginx/agent/sdk/v2 v2.22.1
910
github.com/onsi/ginkgo v1.16.5
1011
github.com/onsi/ginkgo/v2 v2.8.4
1112
github.com/onsi/gomega v1.27.1
1213
github.com/spf13/pflag v1.0.5
14+
google.golang.org/grpc v1.51.0
1315
k8s.io/api v0.26.1
1416
k8s.io/apimachinery v0.26.1
1517
k8s.io/client-go v0.26.1
@@ -19,6 +21,7 @@ require (
1921

2022
require (
2123
github.com/beorn7/perks v1.0.1 // indirect
24+
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
2225
github.com/cespare/xxhash/v2 v2.1.2 // indirect
2326
github.com/davecgh/go-spew v1.1.1 // indirect
2427
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
@@ -37,6 +40,7 @@ require (
3740
github.com/google/gofuzz v1.1.0 // indirect
3841
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
3942
github.com/google/uuid v1.1.2 // indirect
43+
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
4044
github.com/imdario/mergo v0.3.12 // indirect
4145
github.com/josharian/intern v1.0.0 // indirect
4246
github.com/json-iterator/go v1.1.12 // indirect
@@ -45,12 +49,15 @@ require (
4549
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4650
github.com/modern-go/reflect2 v1.0.2 // indirect
4751
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
52+
github.com/nginxinc/nginx-go-crossplane v0.4.1 // indirect
4853
github.com/nxadm/tail v1.4.8 // indirect
4954
github.com/pkg/errors v0.9.1 // indirect
5055
github.com/prometheus/client_golang v1.14.0 // indirect
5156
github.com/prometheus/client_model v0.3.0 // indirect
5257
github.com/prometheus/common v0.37.0 // indirect
5358
github.com/prometheus/procfs v0.8.0 // indirect
59+
github.com/rogpeppe/go-internal v1.9.0 // indirect
60+
github.com/sirupsen/logrus v1.9.0 // indirect
5461
go.uber.org/atomic v1.9.0 // indirect
5562
go.uber.org/multierr v1.7.0 // indirect
5663
go.uber.org/zap v1.24.0 // indirect
@@ -64,7 +71,8 @@ require (
6471
golang.org/x/tools v0.6.0 // indirect
6572
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
6673
google.golang.org/appengine v1.6.7 // indirect
67-
google.golang.org/protobuf v1.28.1 // indirect
74+
google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37 // indirect
75+
google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8 // indirect
6876
gopkg.in/inf.v0 v0.9.1 // indirect
6977
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
7078
gopkg.in/yaml.v2 v2.4.0 // indirect

go.sum

Lines changed: 45 additions & 3 deletions
Large diffs are not rendered by default.

internal/grpc/server.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"net"
6+
7+
"github.com/go-logr/logr"
8+
sdkGrpc "github.com/nginx/agent/sdk/v2/grpc"
9+
"github.com/nginx/agent/sdk/v2/proto"
10+
"google.golang.org/grpc"
11+
)
12+
13+
const protocol = "tcp"
14+
15+
// Server is the gRPC server that handles requests from nginx agents.
16+
type Server struct {
17+
listener net.Listener
18+
server *grpc.Server
19+
logger logr.Logger
20+
}
21+
22+
// NewServer accepts a logger, address, and CommandServer implementation. It creates a gRPC server listening on the
23+
// given address and registers the CommandServer implementation with the gRPC server.
24+
func NewServer(logger logr.Logger, address string, commander proto.CommanderServer) (*Server, error) {
25+
listener, err := net.Listen(protocol, address)
26+
if err != nil {
27+
return nil, err
28+
}
29+
30+
grpcServer := grpc.NewServer(sdkGrpc.DefaultServerDialOptions...)
31+
32+
proto.RegisterCommanderServer(grpcServer, commander)
33+
34+
s := &Server{
35+
logger: logger,
36+
listener: listener,
37+
server: grpcServer,
38+
}
39+
40+
return s, nil
41+
}
42+
43+
// Addr returns the address that the server is listening on.
44+
func (s *Server) Addr() string {
45+
return s.listener.Addr().String()
46+
}
47+
48+
// Start starts the gRPC server. If the context is canceled, the server is stopped.
49+
func (s *Server) Start(ctx context.Context) error {
50+
go func() {
51+
<-ctx.Done()
52+
53+
s.server.GracefulStop()
54+
s.logger.Info("gRPC server stopped")
55+
}()
56+
57+
s.logger.Info("Starting gRPC Server", "addr", s.listener.Addr().String())
58+
return s.server.Serve(s.listener)
59+
}

internal/grpc/server_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package grpc_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/nginx/agent/sdk/v2/client"
8+
. "github.com/onsi/gomega"
9+
"github.com/onsi/gomega/gbytes"
10+
goGrpc "google.golang.org/grpc"
11+
"google.golang.org/grpc/credentials/insecure"
12+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
13+
14+
"github.com/nginxinc/nginx-kubernetes-gateway/internal/grpc"
15+
"github.com/nginxinc/nginx-kubernetes-gateway/internal/grpc/service"
16+
)
17+
18+
// This test is pretty simple at the moment. We are only verifying that the server can be started, stopped,
19+
// and that the Commander implementation is registered with the server.
20+
// Once we add more functionality this test may become more meaningful.
21+
func TestServer(t *testing.T) {
22+
g := NewGomegaWithT(t)
23+
24+
buf := gbytes.NewBuffer()
25+
logger := zap.New(zap.WriteTo(buf))
26+
27+
server, err := grpc.NewServer(logger, "localhost:0", service.NewCommander(logger))
28+
g.Expect(err).To(BeNil())
29+
g.Expect(server).ToNot(BeNil())
30+
31+
ctx, cancel := context.WithCancel(context.Background())
32+
33+
go func() {
34+
g.Expect(server.Start(ctx)).To(Succeed())
35+
}()
36+
37+
commanderClient := client.NewCommanderClient()
38+
commanderClient.WithServer(server.Addr())
39+
commanderClient.WithDialOptions(goGrpc.WithTransportCredentials(insecure.NewCredentials()))
40+
41+
err = commanderClient.Connect(ctx)
42+
g.Expect(err).To(BeNil())
43+
44+
g.Eventually(buf).Should(gbytes.Say("Commander CommandChannel"))
45+
46+
cancel()
47+
g.Eventually(buf).Should(gbytes.Say("gRPC server stopped"))
48+
}

internal/grpc/service/commander.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package service
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
8+
"github.com/go-logr/logr"
9+
"github.com/nginx/agent/sdk/v2/grpc"
10+
"github.com/nginx/agent/sdk/v2/proto"
11+
)
12+
13+
// Commander implements the proto.CommanderServer interface.
14+
// This code is for demo purposes only. It's the least amount of code I could write to demonstrate that the agent and
15+
// control plane can communicate with each other. It is not the final version, so it isn't tested or commented.
16+
// Code is a version of https://github.com/nginx/agent/blob/main/sdk/examples/services/command_service.go
17+
type Commander struct {
18+
toClient chan *proto.Command
19+
logger logr.Logger
20+
}
21+
22+
func NewCommander(logger logr.Logger) *Commander {
23+
return &Commander{
24+
toClient: make(chan *proto.Command),
25+
logger: logger,
26+
}
27+
}
28+
29+
func (c *Commander) CommandChannel(stream proto.Commander_CommandChannelServer) error {
30+
c.logger.Info("Commander CommandChannel")
31+
32+
go c.handleReceive(stream)
33+
34+
for {
35+
select {
36+
case out := <-c.toClient:
37+
err := stream.Send(out)
38+
if errors.Is(err, io.EOF) {
39+
c.logger.Info("CommandChannel EOF")
40+
return nil
41+
}
42+
if err != nil {
43+
c.logger.Error(err, "failed to send outgoing command")
44+
continue
45+
}
46+
case <-stream.Context().Done():
47+
c.logger.Info("CommandChannel complete")
48+
return nil
49+
}
50+
}
51+
}
52+
53+
func (c *Commander) Download(request *proto.DownloadRequest, _ proto.Commander_DownloadServer) error {
54+
c.logger.Info("Commander Download requested", "request", request.GetMeta())
55+
56+
return nil
57+
}
58+
59+
func (c *Commander) Upload(upload proto.Commander_UploadServer) error {
60+
c.logger.Info("Commander Upload requested")
61+
62+
for {
63+
chunk, err := upload.Recv()
64+
65+
if err != nil && !errors.Is(err, io.EOF) {
66+
c.logger.Error(err, "upload receive error")
67+
return err
68+
}
69+
70+
c.logger.Info("Received chunk from upload channel", "chunk", chunk)
71+
72+
if errors.Is(err, io.EOF) {
73+
c.logger.Info("Commander Upload completed")
74+
return upload.SendAndClose(&proto.UploadStatus{Status: proto.UploadStatus_OK})
75+
}
76+
}
77+
}
78+
79+
func (c *Commander) handleReceive(server proto.Commander_CommandChannelServer) {
80+
for {
81+
cmd, err := server.Recv()
82+
if err != nil {
83+
c.logger.Error(err, "failed to receive command from CommandChannelServer")
84+
return
85+
}
86+
87+
c.handleCommand(cmd)
88+
}
89+
}
90+
91+
func (c *Commander) handleCommand(cmd *proto.Command) {
92+
if cmd != nil {
93+
switch commandData := cmd.Data.(type) {
94+
// The only command we care about right now is the AgentConnectRequest.
95+
case *proto.Command_AgentConnectRequest:
96+
c.logger.Info("Received a connection request from an agent", "data", commandData.AgentConnectRequest.GetMeta())
97+
c.sendAgentConnectResponse(cmd)
98+
default:
99+
c.logger.Info("ignoring command", "command data type", fmt.Sprintf("%T", cmd.Data))
100+
}
101+
}
102+
}
103+
104+
func (c *Commander) sendAgentConnectResponse(cmd *proto.Command) {
105+
// get first nginx id for example
106+
nginxID := "0"
107+
if len(cmd.GetAgentConnectRequest().GetDetails()) > 0 {
108+
nginxID = cmd.GetAgentConnectRequest().GetDetails()[0].GetNginxId()
109+
}
110+
response := &proto.Command{
111+
Data: &proto.Command_AgentConnectResponse{
112+
AgentConnectResponse: &proto.AgentConnectResponse{
113+
AgentConfig: &proto.AgentConfig{
114+
Configs: &proto.ConfigReport{
115+
Meta: grpc.NewMessageMeta(cmd.Meta.MessageId),
116+
Configs: []*proto.ConfigDescriptor{
117+
{
118+
Checksum: "",
119+
NginxId: nginxID,
120+
SystemId: cmd.GetAgentConnectRequest().GetMeta().GetSystemUid(),
121+
},
122+
},
123+
},
124+
},
125+
Status: &proto.AgentConnectStatus{
126+
StatusCode: proto.AgentConnectStatus_CONNECT_OK,
127+
Message: "Connected",
128+
},
129+
},
130+
},
131+
Meta: grpc.NewMessageMeta(cmd.Meta.MessageId),
132+
Type: proto.Command_NORMAL,
133+
}
134+
135+
c.toClient <- response
136+
}

0 commit comments

Comments
 (0)