Skip to content

CP/DP Split: track agent connections #2970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions internal/mode/static/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/licensing"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent"
agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
ngxcfg "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/clientsettings"
Expand Down Expand Up @@ -180,14 +181,14 @@ func StartManager(cfg config.Config) error {

nginxUpdater := agent.NewNginxUpdater(cfg.Logger.WithName("nginxUpdater"), cfg.Plus)

grpcServer := &agent.GRPCServer{
Logger: cfg.Logger.WithName("agentGRPCServer"),
RegisterServices: []func(*grpc.Server){
grpcServer := agentgrpc.NewServer(
cfg.Logger.WithName("agentGRPCServer"),
grpcServerPort,
[]func(*grpc.Server){
nginxUpdater.CommandService.Register,
nginxUpdater.FileService.Register,
},
Port: grpcServerPort,
}
)

if err = mgr.Add(&runnables.LeaderOrNonLeader{Runnable: grpcServer}); err != nil {
return fmt.Errorf("cannot register grpc server: %w", err)
Expand Down
19 changes: 10 additions & 9 deletions internal/mode/static/nginx/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@ type NginxUpdater interface {
type NginxUpdaterImpl struct {
CommandService *commandService
FileService *fileService
Logger logr.Logger
Plus bool
logger logr.Logger
plus bool
}

// NewNginxUpdater returns a new NginxUpdaterImpl instance.
func NewNginxUpdater(logger logr.Logger, plus bool) *NginxUpdaterImpl {
return &NginxUpdaterImpl{
Logger: logger,
Plus: plus,
CommandService: newCommandService(),
FileService: newFileService(),
logger: logger,
plus: plus,
CommandService: newCommandService(logger.WithName("commandService")),
FileService: newFileService(logger.WithName("fileService")),
}
}

// UpdateConfig sends the nginx configuration to the agent.
func (n *NginxUpdaterImpl) UpdateConfig(files int) {
n.Logger.Info("Sending nginx configuration to agent", "numFiles", files)
n.logger.Info("Sending nginx configuration to agent", "numFiles", files)
}

// UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API.
// Only applicable when using NGINX Plus.
func (n *NginxUpdaterImpl) UpdateUpstreamServers() {
if !n.Plus {
if !n.plus {
return
}

n.Logger.Info("Updating upstream servers using NGINX Plus API")
n.logger.Info("Updating upstream servers using NGINX Plus API")
}
120 changes: 94 additions & 26 deletions internal/mode/static/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,51 @@ import (
"fmt"
"time"

"github.com/go-logr/logr"
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"google.golang.org/grpc"

agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
)

// commandService handles the connection and subscription to the agent.
// commandService handles the connection and subscription to the data plane agent.
type commandService struct {
pb.CommandServiceServer
connTracker *agentgrpc.ConnectionsTracker
// TODO(sberman): all logs are at Info level right now. Adjust appropriately.
logger logr.Logger
}

func newCommandService() *commandService {
return &commandService{}
func newCommandService(logger logr.Logger) *commandService {
return &commandService{
logger: logger,
connTracker: agentgrpc.NewConnectionsTracker(),
}
}

func (cs *commandService) Register(server *grpc.Server) {
pb.RegisterCommandServiceServer(server, cs)
}

// CreateConnection registers a data plane agent with the control plane.
func (cs *commandService) CreateConnection(
_ context.Context,
ctx context.Context,
req *pb.CreateConnectionRequest,
) (*pb.CreateConnectionResponse, error) {
if req == nil {
return nil, errors.New("empty connection request")
}

fmt.Printf("Creating connection for nginx pod: %s\n", req.GetResource().GetContainerInfo().GetHostname())
gi, ok := grpcContext.GrpcInfoFromContext(ctx)
if !ok {
return nil, agentgrpc.ErrStatusInvalidConnection
}

podName := req.GetResource().GetContainerInfo().GetHostname()

cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", podName))
cs.connTracker.Track(gi.IPAddress, podName)

return &pb.CreateConnectionResponse{
Response: &pb.CommandResponse{
Expand All @@ -40,50 +59,99 @@ func (cs *commandService) CreateConnection(
}, nil
}

// Subscribe is a decoupled communication mechanism between the data plane agent and control plane.
func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error {
fmt.Println("Received subscribe request")

ctx := in.Context()

gi, ok := grpcContext.GrpcInfoFromContext(ctx)
if !ok {
return agentgrpc.ErrStatusInvalidConnection
}

cs.logger.Info(fmt.Sprintf("Received subscribe request from %q", gi.IPAddress))

go cs.listenForDataPlaneResponse(ctx, in)

// wait for the agent to report itself
podName, err := cs.waitForConnection(ctx, gi)
if err != nil {
cs.logger.Error(err, "error waiting for connection")
return err
}

cs.logger.Info(fmt.Sprintf("Handling subscription for %s/%s", podName, gi.IPAddress))
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Minute):
dummyRequest := &pb.ManagementPlaneRequest{
Request: &pb.ManagementPlaneRequest_StatusRequest{
StatusRequest: &pb.StatusRequest{},
Request: &pb.ManagementPlaneRequest_HealthRequest{
HealthRequest: &pb.HealthRequest{},
},
}
if err := in.Send(dummyRequest); err != nil { // will likely need retry logic
fmt.Printf("ERROR: %v\n", err)
if err := in.Send(dummyRequest); err != nil { // TODO(sberman): will likely need retry logic
cs.logger.Error(err, "error sending request to agent")
}
}
}
}

func (cs *commandService) UpdateDataPlaneStatus(
_ context.Context,
req *pb.UpdateDataPlaneStatusRequest,
) (*pb.UpdateDataPlaneStatusResponse, error) {
fmt.Println("Updating data plane status")
// TODO(sberman): current issue: when control plane restarts, agent doesn't re-establish a CreateConnection call,
// so this fails.
func (cs *commandService) waitForConnection(ctx context.Context, gi grpcContext.GrpcInfo) (string, error) {
var podName string
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

if req == nil {
return nil, errors.New("empty update data plane status request")
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-timer.C:
return "", errors.New("timed out waiting for agent connection")
case <-ticker.C:
if podName = cs.connTracker.GetConnection(gi.IPAddress); podName != "" {
return podName, nil
}
}
}
}

return &pb.UpdateDataPlaneStatusResponse{}, nil
func (cs *commandService) listenForDataPlaneResponse(ctx context.Context, in pb.CommandService_SubscribeServer) {
for {
select {
case <-ctx.Done():
return
default:
dataPlaneResponse, err := in.Recv()
cs.logger.Info(fmt.Sprintf("Received data plane response: %v", dataPlaneResponse))
if err != nil {
cs.logger.Error(err, "failed to receive data plane response")
return
}
}
}
}

// UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent.
// TODO(sberman): Is health monitoring the data planes something useful for us to do?
func (cs *commandService) UpdateDataPlaneHealth(
_ context.Context,
req *pb.UpdateDataPlaneHealthRequest,
_ *pb.UpdateDataPlaneHealthRequest,
) (*pb.UpdateDataPlaneHealthResponse, error) {
fmt.Println("Updating data plane health")

if req == nil {
return nil, errors.New("empty update dataplane health request")
}

return &pb.UpdateDataPlaneHealthResponse{}, nil
}

// UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata,
// instance metadata, or configurations. Since directly changing nginx configuration on the instance
// is not supported, this is a no-op for NGF.
func (cs *commandService) UpdateDataPlaneStatus(
_ context.Context,
_ *pb.UpdateDataPlaneStatusRequest,
) (*pb.UpdateDataPlaneStatusResponse, error) {
return &pb.UpdateDataPlaneStatusResponse{}, nil
}
38 changes: 22 additions & 16 deletions internal/mode/static/nginx/agent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,65 @@ import (
"context"
"fmt"

"github.com/go-logr/logr"
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"google.golang.org/grpc"
)

// fileService handles file management between the control plane and the agent.
type fileService struct {
pb.FileServiceServer
// TODO(sberman): all logs are at Info level right now. Adjust appropriately.
logger logr.Logger
}

func newFileService() *fileService {
return &fileService{}
func newFileService(logger logr.Logger) *fileService {
return &fileService{logger: logger}
}

func (fs *fileService) Register(server *grpc.Server) {
pb.RegisterFileServiceServer(server, fs)
}

// GetOverview gets the overview of files for a particular configuration version of an instance.
// Agent calls this if it's missing an overview when a ConfigApplyRequest is called by the control plane.
func (fs *fileService) GetOverview(
_ context.Context,
_ *pb.GetOverviewRequest,
) (*pb.GetOverviewResponse, error) {
fmt.Println("Get overview request")
fs.logger.Info("Get overview request")

return &pb.GetOverviewResponse{
Overview: &pb.FileOverview{},
}, nil
}

func (fs *fileService) UpdateOverview(
_ context.Context,
_ *pb.UpdateOverviewRequest,
) (*pb.UpdateOverviewResponse, error) {
fmt.Println("Update overview request")

return &pb.UpdateOverviewResponse{}, nil
}

// GetFile is called by the agent when it needs to download a file for a ConfigApplyRequest.
func (fs *fileService) GetFile(
_ context.Context,
req *pb.GetFileRequest,
) (*pb.GetFileResponse, error) {
filename := req.GetFileMeta().GetName()
hash := req.GetFileMeta().GetHash()
fmt.Printf("Getting file: %s, %s\n", filename, hash)
fs.logger.Info(fmt.Sprintf("Getting file: %s, %s", filename, hash))

return &pb.GetFileResponse{}, nil
}

// UpdateOverview is called by agent on startup and whenever any files change on the instance.
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
func (fs *fileService) UpdateOverview(
_ context.Context,
_ *pb.UpdateOverviewRequest,
) (*pb.UpdateOverviewResponse, error) {
return &pb.UpdateOverviewResponse{}, nil
}

// UpdateFile is called by agent whenever any files change on the instance.
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
func (fs *fileService) UpdateFile(
_ context.Context,
req *pb.UpdateFileRequest,
_ *pb.UpdateFileRequest,
) (*pb.UpdateFileResponse, error) {
fmt.Println("Update file request for: ", req.GetFile().GetFileMeta().GetName())

return &pb.UpdateFileResponse{}, nil
}
59 changes: 0 additions & 59 deletions internal/mode/static/nginx/agent/grpc.go

This file was deleted.

Loading
Loading