Skip to content

Add skeleton for gRPC Commander Service #408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions build/nginx-with-agent/nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ dataplane:
metrics:
# specify the size of a buffer to build before sending metrics
bulk_size: 20
# specify metrics poll interval
report_interval: 1m
# setting the report interval to 1 year because our control plane doesn't implement the metrics server
# so the agent spams the logs with error messages and retries. Ideally, we'll be able to disable the metrics
# client altogether in the future.
report_interval: 8760h
collection_interval: 15s
mode: aggregated

Expand All @@ -40,3 +42,12 @@ config_dirs: "/etc/nginx"
api:
# default port for Agent API, this is for the server configuration of the REST API
port: 8081

server:
host: 127.0.0.1
grpcPort: 54789

# TLS is temporarily disabled. Once we fully separate the data plane from the control plane TLS will be enabled.
tls:
enable: false
skip_verify: true
1 change: 0 additions & 1 deletion build/nginx-with-agent/nginx-with-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ spec:
imagePullPolicy: IfNotPresent
name: nginx-with-agent
securityContext:
allowPrivilegeEscalation: true
runAsNonRoot: true
runAsUser: 101 #nginx
capabilities:
Expand Down
16 changes: 15 additions & 1 deletion deploy/manifests/nginx-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ spec:
args:
- --gateway-ctlr-name=k8s-gateway.nginx.org/nginx-gateway-controller
- --gatewayclass=nginx
- image: nginx:1.23
- image: nginx:1.23 # I will remove the nginx container once the control plane can push config to nginx-with-agent.
imagePullPolicy: IfNotPresent
name: nginx
ports:
Expand All @@ -131,3 +131,17 @@ spec:
mountPath: /var/lib/nginx
- name: njs-modules
mountPath: /usr/lib/nginx/modules/njs
- name: nginx-with-agent
image: docker.io/nginx-kubernetes-gateway/nginx-with-agent:edge
imagePullPolicy: IfNotPresent
securityContext:
runAsNonRoot: true
runAsUser: 101 #nginx
capabilities:
drop:
- ALL
ports:
- name: http
containerPort: 8080
- name: https
containerPort: 8443
10 changes: 9 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ require (
github.com/go-logr/logr v1.2.3
github.com/google/go-cmp v0.5.9
github.com/maxbrunsfeld/counterfeiter/v6 v6.5.0
github.com/nginx/agent/sdk/v2 v2.22.1
github.com/onsi/ginkgo v1.16.5
github.com/onsi/ginkgo/v2 v2.7.0
github.com/onsi/gomega v1.26.0
github.com/spf13/pflag v1.0.5
google.golang.org/grpc v1.51.0
k8s.io/api v0.26.1
k8s.io/apimachinery v0.26.1
k8s.io/client-go v0.26.1
Expand All @@ -19,6 +21,7 @@ require (

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
Expand All @@ -35,6 +38,7 @@ require (
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -43,12 +47,15 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nginxinc/nginx-go-crossplane v0.4.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.24.0 // indirect
Expand All @@ -62,7 +69,8 @@ require (
golang.org/x/tools v0.4.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37 // indirect
google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
47 changes: 44 additions & 3 deletions go.sum

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions internal/grpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package grpc

import (
"context"
"net"

"github.com/go-logr/logr"
sdkGrpc "github.com/nginx/agent/sdk/v2/grpc"
"github.com/nginx/agent/sdk/v2/proto"
"google.golang.org/grpc"
)

const protocol = "tcp"

// Server is the gRPC server that handles requests from nginx agents.
type Server struct {
listener net.Listener
server *grpc.Server
logger logr.Logger
}

// NewServer accepts a logger, address, and CommandServer implementation. It creates a gRPC server listening on the
// given address and registers the CommandServer implementation with the gRPC server.
func NewServer(logger logr.Logger, address string, commander proto.CommanderServer) (*Server, error) {
listener, err := net.Listen(protocol, address)
if err != nil {
return nil, err
}

grpcServer := grpc.NewServer(sdkGrpc.DefaultServerDialOptions...)

proto.RegisterCommanderServer(grpcServer, commander)

s := &Server{
logger: logger,
listener: listener,
server: grpcServer,
}

return s, nil
}

// Addr returns the address that the server is listening on.
func (s *Server) Addr() string {
return s.listener.Addr().String()
}

// Start starts the gRPC server. If the context is canceled, the server is stopped.
func (s *Server) Start(ctx context.Context) error {
go func() {
<-ctx.Done()

s.server.GracefulStop()
s.logger.Info("gRPC server stopped")
}()

s.logger.Info("Starting gRPC Server", "addr", s.listener.Addr().String())
return s.server.Serve(s.listener)
}
48 changes: 48 additions & 0 deletions internal/grpc/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package grpc_test

import (
"context"
"testing"

"github.com/nginx/agent/sdk/v2/client"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
goGrpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/nginxinc/nginx-kubernetes-gateway/internal/grpc"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/grpc/service"
)

// This test is pretty simple at the moment. We are only verifying that the server can be started, stopped,
// and that the Commander implementation is registered with the server.
// Once we add more functionality this test may become more meaningful.
func TestServer(t *testing.T) {
g := NewGomegaWithT(t)

buf := gbytes.NewBuffer()
logger := zap.New(zap.WriteTo(buf))

server, err := grpc.NewServer(logger, "localhost:0", service.NewCommander(logger))
g.Expect(err).To(BeNil())
g.Expect(server).ToNot(BeNil())

ctx, cancel := context.WithCancel(context.Background())

go func() {
g.Expect(server.Start(ctx)).To(Succeed())
}()

commanderClient := client.NewCommanderClient()
commanderClient.WithServer(server.Addr())
commanderClient.WithDialOptions(goGrpc.WithTransportCredentials(insecure.NewCredentials()))

err = commanderClient.Connect(ctx)
g.Expect(err).To(BeNil())

g.Eventually(buf).Should(gbytes.Say("Commander CommandChannel"))

cancel()
g.Eventually(buf).Should(gbytes.Say("gRPC server stopped"))
}
136 changes: 136 additions & 0 deletions internal/grpc/service/commander.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package service

import (
"errors"
"fmt"
"io"

"github.com/go-logr/logr"
"github.com/nginx/agent/sdk/v2/grpc"
"github.com/nginx/agent/sdk/v2/proto"
)

// Commander implements the proto.CommanderServer interface.
// This code is for demo purposes only. It's the least amount of code I could write to demonstrate that the agent and
// control plane can communicate with each other. It is not the final version, so it isn't tested or commented.
// Code is a version of https://github.com/nginx/agent/blob/main/sdk/examples/services/command_service.go
type Commander struct {
toClient chan *proto.Command
logger logr.Logger
}

func NewCommander(logger logr.Logger) *Commander {
return &Commander{
toClient: make(chan *proto.Command),
logger: logger,
}
}

func (c *Commander) CommandChannel(stream proto.Commander_CommandChannelServer) error {
c.logger.Info("Commander CommandChannel")

go c.handleReceive(stream)

for {
select {
case out := <-c.toClient:
err := stream.Send(out)
if errors.Is(err, io.EOF) {
c.logger.Info("CommandChannel EOF")
return nil
}
if err != nil {
c.logger.Error(err, "failed to send outgoing command")
continue
}
case <-stream.Context().Done():
c.logger.Info("CommandChannel complete")
return nil
}
}
}

func (c *Commander) Download(request *proto.DownloadRequest, _ proto.Commander_DownloadServer) error {
c.logger.Info("Commander Download requested", "request", request.GetMeta())

return nil
}

func (c *Commander) Upload(upload proto.Commander_UploadServer) error {
c.logger.Info("Commander Upload requested")

for {
chunk, err := upload.Recv()

if err != nil && !errors.Is(err, io.EOF) {
c.logger.Error(err, "upload receive error")
return err
}

c.logger.Info("Received chunk from upload channel", "chunk", chunk)

if errors.Is(err, io.EOF) {
c.logger.Info("Commander Upload completed")
return upload.SendAndClose(&proto.UploadStatus{Status: proto.UploadStatus_OK})
}
}
}

func (c *Commander) handleReceive(server proto.Commander_CommandChannelServer) {
for {
cmd, err := server.Recv()
if err != nil {
c.logger.Error(err, "failed to receive command from CommandChannelServer")
return
}

c.handleCommand(cmd)
}
}

func (c *Commander) handleCommand(cmd *proto.Command) {
if cmd != nil {
switch commandData := cmd.Data.(type) {
// The only command we care about right now is the AgentConnectRequest.
case *proto.Command_AgentConnectRequest:
c.logger.Info("Received a connection request from an agent", "data", commandData.AgentConnectRequest.GetMeta())
c.sendAgentConnectResponse(cmd)
default:
c.logger.Info("ignoring command", "command data type", fmt.Sprintf("%T", cmd.Data))
}
}
}

func (c *Commander) sendAgentConnectResponse(cmd *proto.Command) {
// get first nginx id for example
nginxID := "0"
if len(cmd.GetAgentConnectRequest().GetDetails()) > 0 {
nginxID = cmd.GetAgentConnectRequest().GetDetails()[0].GetNginxId()
}
response := &proto.Command{
Data: &proto.Command_AgentConnectResponse{
AgentConnectResponse: &proto.AgentConnectResponse{
AgentConfig: &proto.AgentConfig{
Configs: &proto.ConfigReport{
Meta: grpc.NewMessageMeta(cmd.Meta.MessageId),
Configs: []*proto.ConfigDescriptor{
{
Checksum: "",
NginxId: nginxID,
SystemId: cmd.GetAgentConnectRequest().GetMeta().GetSystemUid(),
},
},
},
},
Status: &proto.AgentConnectStatus{
StatusCode: proto.AgentConnectStatus_CONNECT_OK,
Message: "Connected",
},
},
},
Meta: grpc.NewMessageMeta(cmd.Meta.MessageId),
Type: proto.Command_NORMAL,
}

c.toClient <- response
}
Loading