Skip to content

CP/DP Split: track agent connections #2970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions internal/mode/static/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/licensing"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent"
agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
ngxcfg "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/clientsettings"
Expand Down Expand Up @@ -180,14 +181,14 @@

nginxUpdater := agent.NewNginxUpdater(cfg.Logger.WithName("nginxUpdater"), cfg.Plus)

grpcServer := &agent.GRPCServer{
Logger: cfg.Logger.WithName("agentGRPCServer"),
RegisterServices: []func(*grpc.Server){
grpcServer := agentgrpc.NewServer(
cfg.Logger.WithName("agentGRPCServer"),
grpcServerPort,
[]func(*grpc.Server){

Check warning on line 187 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L184-L187

Added lines #L184 - L187 were not covered by tests
nginxUpdater.CommandService.Register,
nginxUpdater.FileService.Register,
},
Port: grpcServerPort,
}
)

Check warning on line 191 in internal/mode/static/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/manager.go#L191

Added line #L191 was not covered by tests

if err = mgr.Add(&runnables.LeaderOrNonLeader{Runnable: grpcServer}); err != nil {
return fmt.Errorf("cannot register grpc server: %w", err)
Expand Down
19 changes: 10 additions & 9 deletions internal/mode/static/nginx/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@
type NginxUpdaterImpl struct {
CommandService *commandService
FileService *fileService
Logger logr.Logger
Plus bool
logger logr.Logger
plus bool
}

// NewNginxUpdater returns a new NginxUpdaterImpl instance.
func NewNginxUpdater(logger logr.Logger, plus bool) *NginxUpdaterImpl {
return &NginxUpdaterImpl{
Logger: logger,
Plus: plus,
CommandService: newCommandService(),
FileService: newFileService(),
logger: logger,
plus: plus,
CommandService: newCommandService(logger.WithName("commandService")),
FileService: newFileService(logger.WithName("fileService")),

Check warning on line 31 in internal/mode/static/nginx/agent/agent.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/agent.go#L28-L31

Added lines #L28 - L31 were not covered by tests
}
}

// UpdateConfig sends the nginx configuration to the agent.
func (n *NginxUpdaterImpl) UpdateConfig(files int) {
n.Logger.Info("Sending nginx configuration to agent", "numFiles", files)
n.logger.Info("Sending nginx configuration to agent", "numFiles", files)

Check warning on line 37 in internal/mode/static/nginx/agent/agent.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/agent.go#L37

Added line #L37 was not covered by tests
}

// UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API.
// Only applicable when using NGINX Plus.
func (n *NginxUpdaterImpl) UpdateUpstreamServers() {
if !n.Plus {
if !n.plus {

Check warning on line 43 in internal/mode/static/nginx/agent/agent.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/agent.go#L43

Added line #L43 was not covered by tests
return
}

n.Logger.Info("Updating upstream servers using NGINX Plus API")
n.logger.Info("Updating upstream servers using NGINX Plus API")

Check warning on line 47 in internal/mode/static/nginx/agent/agent.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/agent.go#L47

Added line #L47 was not covered by tests
}
120 changes: 94 additions & 26 deletions internal/mode/static/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,51 @@
"fmt"
"time"

"github.com/go-logr/logr"
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"google.golang.org/grpc"

agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
)

// commandService handles the connection and subscription to the agent.
// commandService handles the connection and subscription to the data plane agent.
type commandService struct {
pb.CommandServiceServer
connTracker *agentgrpc.ConnectionsTracker
// TODO(sberman): all logs are at Info level right now. Adjust appropriately.
logger logr.Logger
}

func newCommandService() *commandService {
return &commandService{}
func newCommandService(logger logr.Logger) *commandService {
return &commandService{
logger: logger,
connTracker: agentgrpc.NewConnectionsTracker(),
}

Check warning on line 29 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L25-L29

Added lines #L25 - L29 were not covered by tests
}

func (cs *commandService) Register(server *grpc.Server) {
pb.RegisterCommandServiceServer(server, cs)
}

// CreateConnection registers a data plane agent with the control plane.
func (cs *commandService) CreateConnection(
_ context.Context,
ctx context.Context,
req *pb.CreateConnectionRequest,
) (*pb.CreateConnectionResponse, error) {
if req == nil {
return nil, errors.New("empty connection request")
}

fmt.Printf("Creating connection for nginx pod: %s\n", req.GetResource().GetContainerInfo().GetHostname())
gi, ok := grpcContext.GrpcInfoFromContext(ctx)
if !ok {
return nil, agentgrpc.ErrStatusInvalidConnection
}

Check warning on line 48 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L45-L48

Added lines #L45 - L48 were not covered by tests

podName := req.GetResource().GetContainerInfo().GetHostname()

cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", podName))
cs.connTracker.Track(gi.IPAddress, podName)

Check warning on line 53 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L50-L53

Added lines #L50 - L53 were not covered by tests

return &pb.CreateConnectionResponse{
Response: &pb.CommandResponse{
Expand All @@ -40,50 +59,99 @@
}, nil
}

// Subscribe is a decoupled communication mechanism between the data plane agent and control plane.
func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error {
fmt.Println("Received subscribe request")

ctx := in.Context()

gi, ok := grpcContext.GrpcInfoFromContext(ctx)
if !ok {
return agentgrpc.ErrStatusInvalidConnection
}

Check warning on line 69 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L66-L69

Added lines #L66 - L69 were not covered by tests

cs.logger.Info(fmt.Sprintf("Received subscribe request from %q", gi.IPAddress))

go cs.listenForDataPlaneResponse(ctx, in)

// wait for the agent to report itself
podName, err := cs.waitForConnection(ctx, gi)
if err != nil {
cs.logger.Error(err, "error waiting for connection")
return err
}

Check warning on line 80 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L71-L80

Added lines #L71 - L80 were not covered by tests

cs.logger.Info(fmt.Sprintf("Handling subscription for %s/%s", podName, gi.IPAddress))

Check warning on line 82 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L82

Added line #L82 was not covered by tests
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Minute):
dummyRequest := &pb.ManagementPlaneRequest{
Request: &pb.ManagementPlaneRequest_StatusRequest{
StatusRequest: &pb.StatusRequest{},
Request: &pb.ManagementPlaneRequest_HealthRequest{
HealthRequest: &pb.HealthRequest{},

Check warning on line 90 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L89-L90

Added lines #L89 - L90 were not covered by tests
},
}
if err := in.Send(dummyRequest); err != nil { // will likely need retry logic
fmt.Printf("ERROR: %v\n", err)
if err := in.Send(dummyRequest); err != nil { // TODO(sberman): will likely need retry logic
cs.logger.Error(err, "error sending request to agent")

Check warning on line 94 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L93-L94

Added lines #L93 - L94 were not covered by tests
}
}
}
}

func (cs *commandService) UpdateDataPlaneStatus(
_ context.Context,
req *pb.UpdateDataPlaneStatusRequest,
) (*pb.UpdateDataPlaneStatusResponse, error) {
fmt.Println("Updating data plane status")
// TODO(sberman): current issue: when control plane restarts, agent doesn't re-establish a CreateConnection call,
// so this fails.
func (cs *commandService) waitForConnection(ctx context.Context, gi grpcContext.GrpcInfo) (string, error) {
var podName string
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

Check warning on line 105 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L102-L105

Added lines #L102 - L105 were not covered by tests

if req == nil {
return nil, errors.New("empty update data plane status request")
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-timer.C:
return "", errors.New("timed out waiting for agent connection")
case <-ticker.C:
if podName = cs.connTracker.GetConnection(gi.IPAddress); podName != "" {
return podName, nil
}

Check warning on line 119 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L107-L119

Added lines #L107 - L119 were not covered by tests
}
}
}

return &pb.UpdateDataPlaneStatusResponse{}, nil
func (cs *commandService) listenForDataPlaneResponse(ctx context.Context, in pb.CommandService_SubscribeServer) {
for {
select {
case <-ctx.Done():
return
default:
dataPlaneResponse, err := in.Recv()
cs.logger.Info(fmt.Sprintf("Received data plane response: %v", dataPlaneResponse))
if err != nil {
cs.logger.Error(err, "failed to receive data plane response")
return
}

Check warning on line 135 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L124-L135

Added lines #L124 - L135 were not covered by tests
}
}
}

// UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent.
// TODO(sberman): Is health monitoring the data planes something useful for us to do?
func (cs *commandService) UpdateDataPlaneHealth(
_ context.Context,
req *pb.UpdateDataPlaneHealthRequest,
_ *pb.UpdateDataPlaneHealthRequest,
) (*pb.UpdateDataPlaneHealthResponse, error) {
fmt.Println("Updating data plane health")

if req == nil {
return nil, errors.New("empty update dataplane health request")
}

return &pb.UpdateDataPlaneHealthResponse{}, nil
}

// UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata,
// instance metadata, or configurations. Since directly changing nginx configuration on the instance
// is not supported, this is a no-op for NGF.
func (cs *commandService) UpdateDataPlaneStatus(
_ context.Context,
_ *pb.UpdateDataPlaneStatusRequest,
) (*pb.UpdateDataPlaneStatusResponse, error) {
return &pb.UpdateDataPlaneStatusResponse{}, nil

Check warning on line 156 in internal/mode/static/nginx/agent/command.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/command.go#L155-L156

Added lines #L155 - L156 were not covered by tests
}
38 changes: 22 additions & 16 deletions internal/mode/static/nginx/agent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,65 @@
"context"
"fmt"

"github.com/go-logr/logr"
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"google.golang.org/grpc"
)

// fileService handles file management between the control plane and the agent.
type fileService struct {
pb.FileServiceServer
// TODO(sberman): all logs are at Info level right now. Adjust appropriately.
logger logr.Logger
}

func newFileService() *fileService {
return &fileService{}
func newFileService(logger logr.Logger) *fileService {
return &fileService{logger: logger}

Check warning on line 20 in internal/mode/static/nginx/agent/file.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/file.go#L19-L20

Added lines #L19 - L20 were not covered by tests
}

func (fs *fileService) Register(server *grpc.Server) {
pb.RegisterFileServiceServer(server, fs)
}

// GetOverview gets the overview of files for a particular configuration version of an instance.
// Agent calls this if it's missing an overview when a ConfigApplyRequest is called by the control plane.
func (fs *fileService) GetOverview(
_ context.Context,
_ *pb.GetOverviewRequest,
) (*pb.GetOverviewResponse, error) {
fmt.Println("Get overview request")
fs.logger.Info("Get overview request")

Check warning on line 33 in internal/mode/static/nginx/agent/file.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/file.go#L33

Added line #L33 was not covered by tests

return &pb.GetOverviewResponse{
Overview: &pb.FileOverview{},
}, nil
}

func (fs *fileService) UpdateOverview(
_ context.Context,
_ *pb.UpdateOverviewRequest,
) (*pb.UpdateOverviewResponse, error) {
fmt.Println("Update overview request")

return &pb.UpdateOverviewResponse{}, nil
}

// GetFile is called by the agent when it needs to download a file for a ConfigApplyRequest.
func (fs *fileService) GetFile(
_ context.Context,
req *pb.GetFileRequest,
) (*pb.GetFileResponse, error) {
filename := req.GetFileMeta().GetName()
hash := req.GetFileMeta().GetHash()
fmt.Printf("Getting file: %s, %s\n", filename, hash)
fs.logger.Info(fmt.Sprintf("Getting file: %s, %s", filename, hash))

Check warning on line 47 in internal/mode/static/nginx/agent/file.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/file.go#L47

Added line #L47 was not covered by tests

return &pb.GetFileResponse{}, nil
}

// UpdateOverview is called by agent on startup and whenever any files change on the instance.
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
func (fs *fileService) UpdateOverview(
_ context.Context,
_ *pb.UpdateOverviewRequest,
) (*pb.UpdateOverviewResponse, error) {
return &pb.UpdateOverviewResponse{}, nil

Check warning on line 58 in internal/mode/static/nginx/agent/file.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/nginx/agent/file.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

// UpdateFile is called by agent whenever any files change on the instance.
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
func (fs *fileService) UpdateFile(
_ context.Context,
req *pb.UpdateFileRequest,
_ *pb.UpdateFileRequest,
) (*pb.UpdateFileResponse, error) {
fmt.Println("Update file request for: ", req.GetFile().GetFileMeta().GetName())

return &pb.UpdateFileResponse{}, nil
}
59 changes: 0 additions & 59 deletions internal/mode/static/nginx/agent/grpc.go

This file was deleted.

Loading
Loading