From af9de7f81ed1bdcfdef7d8012df53d900520fb81 Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Thu, 2 Jan 2025 10:34:39 -0700 Subject: [PATCH 1/4] CP/DP Split: track agent connections Added the following: - middleware to extract IP address of agent and store it in the grpc context - link the agent's hostname to its IP address when connecting and track it - use this linkage to pause the Subscription until the agent registers itself, then proceeding This logic is subject to change as we enhance this (like tracking auth token instead of IP address). --- internal/mode/static/manager.go | 11 +- internal/mode/static/nginx/agent/agent.go | 19 +-- internal/mode/static/nginx/agent/command.go | 120 ++++++++++++++---- internal/mode/static/nginx/agent/file.go | 38 +++--- internal/mode/static/nginx/agent/grpc.go | 59 --------- .../static/nginx/agent/grpc/connections.go | 54 ++++++++ .../nginx/agent/grpc/context/context.go | 24 ++++ .../static/nginx/agent/grpc/context/doc.go | 4 + internal/mode/static/nginx/agent/grpc/doc.go | 4 + internal/mode/static/nginx/agent/grpc/grpc.go | 91 +++++++++++++ .../nginx/agent/grpc/interceptor/doc.go | 4 + .../agent/grpc/interceptor/interceptor.go | 83 ++++++++++++ 12 files changed, 396 insertions(+), 115 deletions(-) delete mode 100644 internal/mode/static/nginx/agent/grpc.go create mode 100644 internal/mode/static/nginx/agent/grpc/connections.go create mode 100644 internal/mode/static/nginx/agent/grpc/context/context.go create mode 100644 internal/mode/static/nginx/agent/grpc/context/doc.go create mode 100644 internal/mode/static/nginx/agent/grpc/doc.go create mode 100644 internal/mode/static/nginx/agent/grpc/grpc.go create mode 100644 internal/mode/static/nginx/agent/grpc/interceptor/doc.go create mode 100644 internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index 0c72745345..b6da949c52 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -50,6 +50,7 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/licensing" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent" + agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" ngxcfg "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/clientsettings" @@ -180,14 +181,14 @@ func StartManager(cfg config.Config) error { nginxUpdater := agent.NewNginxUpdater(cfg.Logger.WithName("nginxUpdater"), cfg.Plus) - grpcServer := &agent.GRPCServer{ - Logger: cfg.Logger.WithName("agentGRPCServer"), - RegisterServices: []func(*grpc.Server){ + grpcServer := agentgrpc.NewServer( + cfg.Logger.WithName("agentGRPCServer"), + grpcServerPort, + []func(*grpc.Server){ nginxUpdater.CommandService.Register, nginxUpdater.FileService.Register, }, - Port: grpcServerPort, - } + ) if err = mgr.Add(&runnables.LeaderOrNonLeader{Runnable: grpcServer}); err != nil { return fmt.Errorf("cannot register grpc server: %w", err) diff --git a/internal/mode/static/nginx/agent/agent.go b/internal/mode/static/nginx/agent/agent.go index c6955040cb..1ce5d21b0b 100644 --- a/internal/mode/static/nginx/agent/agent.go +++ b/internal/mode/static/nginx/agent/agent.go @@ -18,30 +18,31 @@ type NginxUpdater interface { type NginxUpdaterImpl struct { CommandService *commandService FileService *fileService - Logger logr.Logger - Plus bool + logger logr.Logger + plus bool } +// NewNginxUpdater returns a new NginxUpdaterImpl instance. func NewNginxUpdater(logger logr.Logger, plus bool) *NginxUpdaterImpl { return &NginxUpdaterImpl{ - Logger: logger, - Plus: plus, - CommandService: newCommandService(), - FileService: newFileService(), + logger: logger, + plus: plus, + CommandService: newCommandService(logger.WithName("commandService")), + FileService: newFileService(logger.WithName("fileService")), } } // UpdateConfig sends the nginx configuration to the agent. func (n *NginxUpdaterImpl) UpdateConfig(files int) { - n.Logger.Info("Sending nginx configuration to agent", "numFiles", files) + n.logger.Info("Sending nginx configuration to agent", "numFiles", files) } // UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API. // Only applicable when using NGINX Plus. func (n *NginxUpdaterImpl) UpdateUpstreamServers() { - if !n.Plus { + if !n.plus { return } - n.Logger.Info("Updating upstream servers using NGINX Plus API") + n.logger.Info("Updating upstream servers using NGINX Plus API") } diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index 3cdf6ce101..1050226b3f 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -6,32 +6,51 @@ import ( "fmt" "time" + "github.com/go-logr/logr" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" "google.golang.org/grpc" + + agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" + grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context" ) -// commandService handles the connection and subscription to the agent. +// commandService handles the connection and subscription to the data plane agent. type commandService struct { pb.CommandServiceServer + connTracker *agentgrpc.ConnectionsTracker + // TODO(sberman): all logs are at Info level right now. Adjust appropriately. + logger logr.Logger } -func newCommandService() *commandService { - return &commandService{} +func newCommandService(logger logr.Logger) *commandService { + return &commandService{ + logger: logger, + connTracker: agentgrpc.NewConnectionsTracker(), + } } func (cs *commandService) Register(server *grpc.Server) { pb.RegisterCommandServiceServer(server, cs) } +// CreateConnection registers a data plane agent with the control plane. func (cs *commandService) CreateConnection( - _ context.Context, + ctx context.Context, req *pb.CreateConnectionRequest, ) (*pb.CreateConnectionResponse, error) { if req == nil { return nil, errors.New("empty connection request") } - fmt.Printf("Creating connection for nginx pod: %s\n", req.GetResource().GetContainerInfo().GetHostname()) + gi, ok := grpcContext.GrpcInfoFromContext(ctx) + if !ok { + return nil, agentgrpc.ErrStatusInvalidConnection + } + + hostname := req.GetResource().GetContainerInfo().GetHostname() + + cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", hostname)) + cs.connTracker.Track(gi.IPAddress, hostname) return &pb.CreateConnectionResponse{ Response: &pb.CommandResponse{ @@ -40,50 +59,99 @@ func (cs *commandService) CreateConnection( }, nil } +// Subscribe is a decoupled communication mechanism between the data plane agent and control plane. func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error { - fmt.Println("Received subscribe request") - ctx := in.Context() + gi, ok := grpcContext.GrpcInfoFromContext(ctx) + if !ok { + return agentgrpc.ErrStatusInvalidConnection + } + + cs.logger.Info(fmt.Sprintf("Received subscribe request from %q", gi.IPAddress)) + + go cs.listenForDataPlaneResponse(ctx, in) + + // wait for connection to be established + podName, err := cs.waitForConnection(ctx, gi) + if err != nil { + cs.logger.Error(err, "error waiting for connection") + return err + } + + cs.logger.Info(fmt.Sprintf("Handling subscription for %s/%s", podName, gi.IPAddress)) for { select { case <-ctx.Done(): return ctx.Err() case <-time.After(1 * time.Minute): dummyRequest := &pb.ManagementPlaneRequest{ - Request: &pb.ManagementPlaneRequest_StatusRequest{ - StatusRequest: &pb.StatusRequest{}, + Request: &pb.ManagementPlaneRequest_HealthRequest{ + HealthRequest: &pb.HealthRequest{}, }, } - if err := in.Send(dummyRequest); err != nil { // will likely need retry logic - fmt.Printf("ERROR: %v\n", err) + if err := in.Send(dummyRequest); err != nil { // TODO(sberman): will likely need retry logic + cs.logger.Error(err, "error sending request to agent") } } } } -func (cs *commandService) UpdateDataPlaneStatus( - _ context.Context, - req *pb.UpdateDataPlaneStatusRequest, -) (*pb.UpdateDataPlaneStatusResponse, error) { - fmt.Println("Updating data plane status") +// TODO(sberman): current issue: when control plane restarts, agent doesn't re-establish a CreateConnection call, +// so this fails. +func (cs *commandService) waitForConnection(ctx context.Context, gi grpcContext.GrpcInfo) (string, error) { + var podName string + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - if req == nil { - return nil, errors.New("empty update data plane status request") + timer := time.NewTimer(30 * time.Second) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-timer.C: + return "", errors.New("timed out waiting for agent connection") + case <-ticker.C: + if podName = cs.connTracker.GetConnection(gi.IPAddress); podName != "" { + return podName, nil + } + } } +} - return &pb.UpdateDataPlaneStatusResponse{}, nil +func (cs *commandService) listenForDataPlaneResponse(ctx context.Context, in pb.CommandService_SubscribeServer) { + for { + select { + case <-ctx.Done(): + return + default: + dataPlaneResponse, err := in.Recv() + cs.logger.Info(fmt.Sprintf("Received data plane response: %v", dataPlaneResponse)) + if err != nil { + cs.logger.Error(err, "failed to receive data plane response") + return + } + } + } } +// UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent. +// TODO(sberman): Is health monitoring the data planes something useful for us to do? func (cs *commandService) UpdateDataPlaneHealth( _ context.Context, - req *pb.UpdateDataPlaneHealthRequest, + _ *pb.UpdateDataPlaneHealthRequest, ) (*pb.UpdateDataPlaneHealthResponse, error) { - fmt.Println("Updating data plane health") - - if req == nil { - return nil, errors.New("empty update dataplane health request") - } - return &pb.UpdateDataPlaneHealthResponse{}, nil } + +// UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata, +// instance metadata, or configurations. Since directly changing nginx configuration on the instance +// is not supported, this is a no-op for NGF. +func (cs *commandService) UpdateDataPlaneStatus( + _ context.Context, + _ *pb.UpdateDataPlaneStatusRequest, +) (*pb.UpdateDataPlaneStatusResponse, error) { + return &pb.UpdateDataPlaneStatusResponse{}, nil +} diff --git a/internal/mode/static/nginx/agent/file.go b/internal/mode/static/nginx/agent/file.go index 9a3df38c4e..296e1705ee 100644 --- a/internal/mode/static/nginx/agent/file.go +++ b/internal/mode/static/nginx/agent/file.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/go-logr/logr" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" "google.golang.org/grpc" ) @@ -11,52 +12,57 @@ import ( // fileService handles file management between the control plane and the agent. type fileService struct { pb.FileServiceServer + // TODO(sberman): all logs are at Info level right now. Adjust appropriately. + logger logr.Logger } -func newFileService() *fileService { - return &fileService{} +func newFileService(logger logr.Logger) *fileService { + return &fileService{logger: logger} } func (fs *fileService) Register(server *grpc.Server) { pb.RegisterFileServiceServer(server, fs) } +// GetOverview gets the overview of files for a particular configuration version of an instance. +// Agent calls this if it's missing an overview when a ConfigApplyRequest is called by the control plane. func (fs *fileService) GetOverview( _ context.Context, _ *pb.GetOverviewRequest, ) (*pb.GetOverviewResponse, error) { - fmt.Println("Get overview request") + fs.logger.Info("Get overview request") return &pb.GetOverviewResponse{ Overview: &pb.FileOverview{}, }, nil } -func (fs *fileService) UpdateOverview( - _ context.Context, - _ *pb.UpdateOverviewRequest, -) (*pb.UpdateOverviewResponse, error) { - fmt.Println("Update overview request") - - return &pb.UpdateOverviewResponse{}, nil -} - +// GetFile is called by the agent when it needs to download a file for a ConfigApplyRequest. func (fs *fileService) GetFile( _ context.Context, req *pb.GetFileRequest, ) (*pb.GetFileResponse, error) { filename := req.GetFileMeta().GetName() hash := req.GetFileMeta().GetHash() - fmt.Printf("Getting file: %s, %s\n", filename, hash) + fs.logger.Info(fmt.Sprintf("Getting file: %s, %s", filename, hash)) return &pb.GetFileResponse{}, nil } +// UpdateOverview is called by agent on startup and whenever any files change on the instance. +// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF. +func (fs *fileService) UpdateOverview( + _ context.Context, + _ *pb.UpdateOverviewRequest, +) (*pb.UpdateOverviewResponse, error) { + return &pb.UpdateOverviewResponse{}, nil +} + +// UpdateFile is called by agent whenever any files change on the instance. +// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF. func (fs *fileService) UpdateFile( _ context.Context, - req *pb.UpdateFileRequest, + _ *pb.UpdateFileRequest, ) (*pb.UpdateFileResponse, error) { - fmt.Println("Update file request for: ", req.GetFile().GetFileMeta().GetName()) - return &pb.UpdateFileResponse{}, nil } diff --git a/internal/mode/static/nginx/agent/grpc.go b/internal/mode/static/nginx/agent/grpc.go deleted file mode 100644 index 6c558da2f3..0000000000 --- a/internal/mode/static/nginx/agent/grpc.go +++ /dev/null @@ -1,59 +0,0 @@ -package agent - -import ( - "context" - "fmt" - "net" - "time" - - "github.com/go-logr/logr" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - "sigs.k8s.io/controller-runtime/pkg/manager" -) - -const ( - keepAliveTime = 1 * time.Minute - keepAliveTimeout = 15 * time.Second -) - -// GRPCServer is a gRPC server for communicating with the nginx agent. -type GRPCServer struct { - Logger logr.Logger - // RegisterServices is a list of functions to register gRPC services to the gRPC server. - RegisterServices []func(*grpc.Server) - // Port is the port that the server is listening on. - // Must be exposed in the control plane deployment/service. - Port int -} - -// Start is a runnable that starts the gRPC server for communicating with the nginx agent. -func (g *GRPCServer) Start(ctx context.Context) error { - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", g.Port)) - if err != nil { - return err - } - - server := grpc.NewServer( - grpc.KeepaliveParams( - keepalive.ServerParameters{ - Time: keepAliveTime, - Timeout: keepAliveTimeout, - }, - ), - ) - - for _, registerSvc := range g.RegisterServices { - registerSvc(server) - } - - go func() { - <-ctx.Done() - g.Logger.Info("Shutting down GRPC Server") - server.GracefulStop() - }() - - return server.Serve(listener) -} - -var _ manager.Runnable = &GRPCServer{} diff --git a/internal/mode/static/nginx/agent/grpc/connections.go b/internal/mode/static/nginx/agent/grpc/connections.go new file mode 100644 index 0000000000..e69392ac55 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/connections.go @@ -0,0 +1,54 @@ +package grpc + +import ( + "sync" +) + +// ConnectionsTracker keeps track of all connections between the control plane and nginx agents. +type ConnectionsTracker struct { + // connections contains a map of all IP addresses that have connected and their associated pod names. + // TODO(sberman): we'll likely need to create a channel for each connection that can be stored in this map. + // Then the Subscription listens on the channel for its connection, while the nginxUpdater sends the config + // for the pod over that channel. + connections map[string]string + + lock sync.Mutex +} + +// NewConnectionsTracker returns a new ConnectionsTracker instance. +func NewConnectionsTracker() *ConnectionsTracker { + return &ConnectionsTracker{ + connections: make(map[string]string), + } +} + +// Track adds a connection to the tracking map. +// TODO(sberman): we need to handle the case when the token expires (once we support the token). +// This likely involves setting a callback to cancel a context when the token expires, which triggers +// the connection to be removed from the tracking list. +func (c *ConnectionsTracker) Track(address, hostname string) { + c.lock.Lock() + defer c.lock.Unlock() + + c.connections[address] = hostname +} + +// GetConnections returns all connections that are currently tracked. +func (c *ConnectionsTracker) GetConnections() map[string]string { + c.lock.Lock() + defer c.lock.Unlock() + + return c.connections +} + +// GetConnection returns the hostname of the requested connection. +func (c *ConnectionsTracker) GetConnection(address string) string { + c.lock.Lock() + defer c.lock.Unlock() + + if val, ok := c.connections[address]; ok { + return val + } + + return "" +} diff --git a/internal/mode/static/nginx/agent/grpc/context/context.go b/internal/mode/static/nginx/agent/grpc/context/context.go new file mode 100644 index 0000000000..f8daf457eb --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/context/context.go @@ -0,0 +1,24 @@ +package context + +import ( + "context" +) + +// GrpcInfo for storing identity information for the gRPC client. +type GrpcInfo struct { + IPAddress string `json:"ip_address"` // ip address of the agent +} + +type contextGRPCKey struct{} + +// NewGrpcContext returns a new context.Context that has the provided GrpcInfo attached. +func NewGrpcContext(ctx context.Context, r GrpcInfo) context.Context { + return context.WithValue(ctx, contextGRPCKey{}, r) +} + +// GrpcInfoFromContext returns the GrpcInfo saved in ctx if it exists. +// Returns false if there's no GrpcInfo in the context. +func GrpcInfoFromContext(ctx context.Context) (GrpcInfo, bool) { + v, ok := ctx.Value(contextGRPCKey{}).(GrpcInfo) + return v, ok +} diff --git a/internal/mode/static/nginx/agent/grpc/context/doc.go b/internal/mode/static/nginx/agent/grpc/context/doc.go new file mode 100644 index 0000000000..689a126cf7 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/context/doc.go @@ -0,0 +1,4 @@ +/* +Package context contains the functions for storing extra information in the gRPC context. +*/ +package context diff --git a/internal/mode/static/nginx/agent/grpc/doc.go b/internal/mode/static/nginx/agent/grpc/doc.go new file mode 100644 index 0000000000..b98f0af8b6 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/doc.go @@ -0,0 +1,4 @@ +/* +Package grpc contains the functionality for the gRPC server for communicating with the nginx agent. +*/ +package grpc diff --git a/internal/mode/static/nginx/agent/grpc/grpc.go b/internal/mode/static/nginx/agent/grpc/grpc.go new file mode 100644 index 0000000000..0bac99f1b8 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/grpc.go @@ -0,0 +1,91 @@ +package grpc + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/go-logr/logr" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/interceptor" +) + +const ( + keepAliveTime = 10 * time.Second + keepAliveTimeout = 10 * time.Second +) + +var ErrStatusInvalidConnection = status.Error(codes.Unauthenticated, "invalid connection") + +// Interceptor provides hooks to intercept the execution of an RPC on the server. +type Interceptor interface { + Stream() grpc.StreamServerInterceptor + Unary() grpc.UnaryServerInterceptor +} + +// Server is a gRPC server for communicating with the nginx agent. +type Server struct { + // Interceptor provides hooks to intercept the execution of an RPC on the server. + interceptor Interceptor + + logger logr.Logger + // RegisterServices is a list of functions to register gRPC services to the gRPC server. + registerServices []func(*grpc.Server) + // Port is the port that the server is listening on. + // Must be exposed in the control plane deployment/service. + port int +} + +func NewServer(logger logr.Logger, port int, registerSvcs []func(*grpc.Server)) *Server { + return &Server{ + logger: logger, + port: port, + registerServices: registerSvcs, + interceptor: interceptor.NewContextSetter(), + } +} + +// Start is a runnable that starts the gRPC server for communicating with the nginx agent. +func (g *Server) Start(ctx context.Context) error { + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", g.port)) + if err != nil { + return err + } + + server := grpc.NewServer( + grpc.KeepaliveParams( + keepalive.ServerParameters{ + Time: keepAliveTime, + Timeout: keepAliveTimeout, + }, + ), + grpc.KeepaliveEnforcementPolicy( + keepalive.EnforcementPolicy{ + MinTime: keepAliveTime, + PermitWithoutStream: true, + }, + ), + grpc.ChainStreamInterceptor(g.interceptor.Stream()), + grpc.ChainUnaryInterceptor(g.interceptor.Unary()), + ) + + for _, registerSvc := range g.registerServices { + registerSvc(server) + } + + go func() { + <-ctx.Done() + g.logger.Info("Shutting down GRPC Server") + server.GracefulStop() + }() + + return server.Serve(listener) +} + +var _ manager.Runnable = &Server{} diff --git a/internal/mode/static/nginx/agent/grpc/interceptor/doc.go b/internal/mode/static/nginx/agent/grpc/interceptor/doc.go new file mode 100644 index 0000000000..e5175664b9 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/interceptor/doc.go @@ -0,0 +1,4 @@ +/* +Package interceptor contains the middleware for intercepting an RPC call. +*/ +package interceptor diff --git a/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go b/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go new file mode 100644 index 0000000000..74819a83ba --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go @@ -0,0 +1,83 @@ +package interceptor + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + + grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context" +) + +type ContextSetter struct{} + +func NewContextSetter() *ContextSetter { + return &ContextSetter{} +} + +// streamHandler is a struct that implements StreamHandler, allowing the interceptor to replace the context. +type streamHandler struct { + grpc.ServerStream + ctx context.Context +} + +func (sh *streamHandler) Context() context.Context { + return sh.ctx +} + +func (c *ContextSetter) Stream() grpc.StreamServerInterceptor { + return func( + srv interface{}, + ss grpc.ServerStream, + _ *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + ctx, err := setContext(ss.Context()) + if err != nil { + return err + } + return handler(srv, &streamHandler{ + ServerStream: ss, + ctx: ctx, + }) + } +} + +func (c *ContextSetter) Unary() grpc.UnaryServerInterceptor { + return func( + ctx context.Context, + req interface{}, + _ *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (resp interface{}, err error) { + if ctx, err = setContext(ctx); err != nil { + return nil, err + } + return handler(ctx, req) + } +} + +// TODO(sberman): for now, we'll just use the IP address of the agent to link a Connection +// to a Subscription by setting it in the context. Once we support auth, we can likely change this +// interceptor to instead set the uuid. +func setContext(ctx context.Context) (context.Context, error) { + p, ok := peer.FromContext(ctx) + if !ok { + return nil, status.Error(codes.InvalidArgument, "no peer data") + } + + addr, ok := p.Addr.(*net.TCPAddr) + if !ok { + panic(fmt.Sprintf("address %q was not of type net.TCPAddr", p.Addr.String())) + } + + gi := &grpcContext.GrpcInfo{ + IPAddress: addr.IP.String(), + } + + return grpcContext.NewGrpcContext(ctx, *gi), nil +} From c0a4342ee26b9ef6a81d53cf8f14f3ab858b6466 Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Fri, 3 Jan 2025 13:16:52 -0700 Subject: [PATCH 2/4] Code review; rename, reorg, dereference --- internal/mode/static/nginx/agent/command.go | 8 ++++---- .../mode/static/nginx/agent/grpc/connections.go | 6 +----- .../nginx/agent/grpc/interceptor/interceptor.go | 16 ++++++++-------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index 1050226b3f..9eabd8680e 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -47,10 +47,10 @@ func (cs *commandService) CreateConnection( return nil, agentgrpc.ErrStatusInvalidConnection } - hostname := req.GetResource().GetContainerInfo().GetHostname() + podName := req.GetResource().GetContainerInfo().GetHostname() - cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", hostname)) - cs.connTracker.Track(gi.IPAddress, hostname) + cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", podName)) + cs.connTracker.Track(gi.IPAddress, podName) return &pb.CreateConnectionResponse{ Response: &pb.CommandResponse{ @@ -72,7 +72,7 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error go cs.listenForDataPlaneResponse(ctx, in) - // wait for connection to be established + // wait for the agent to report itself podName, err := cs.waitForConnection(ctx, gi) if err != nil { cs.logger.Error(err, "error waiting for connection") diff --git a/internal/mode/static/nginx/agent/grpc/connections.go b/internal/mode/static/nginx/agent/grpc/connections.go index e69392ac55..af99b84002 100644 --- a/internal/mode/static/nginx/agent/grpc/connections.go +++ b/internal/mode/static/nginx/agent/grpc/connections.go @@ -46,9 +46,5 @@ func (c *ConnectionsTracker) GetConnection(address string) string { c.lock.Lock() defer c.lock.Unlock() - if val, ok := c.connections[address]; ok { - return val - } - - return "" + return c.connections[address] } diff --git a/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go b/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go index 74819a83ba..3b36c23eef 100644 --- a/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go +++ b/internal/mode/static/nginx/agent/grpc/interceptor/interceptor.go @@ -13,12 +13,6 @@ import ( grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context" ) -type ContextSetter struct{} - -func NewContextSetter() *ContextSetter { - return &ContextSetter{} -} - // streamHandler is a struct that implements StreamHandler, allowing the interceptor to replace the context. type streamHandler struct { grpc.ServerStream @@ -29,7 +23,13 @@ func (sh *streamHandler) Context() context.Context { return sh.ctx } -func (c *ContextSetter) Stream() grpc.StreamServerInterceptor { +type ContextSetter struct{} + +func NewContextSetter() ContextSetter { + return ContextSetter{} +} + +func (c ContextSetter) Stream() grpc.StreamServerInterceptor { return func( srv interface{}, ss grpc.ServerStream, @@ -47,7 +47,7 @@ func (c *ContextSetter) Stream() grpc.StreamServerInterceptor { } } -func (c *ContextSetter) Unary() grpc.UnaryServerInterceptor { +func (c ContextSetter) Unary() grpc.UnaryServerInterceptor { return func( ctx context.Context, req interface{}, From 470021c99567b2677218106c27f5f12fe874ddf0 Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Mon, 6 Jan 2025 13:12:12 -0700 Subject: [PATCH 3/4] Update internal/mode/static/nginx/agent/command.go Co-authored-by: bjee19 <139261241+bjee19@users.noreply.github.com> --- internal/mode/static/nginx/agent/command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index 9eabd8680e..4f010e3dee 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -18,7 +18,7 @@ import ( type commandService struct { pb.CommandServiceServer connTracker *agentgrpc.ConnectionsTracker - // TODO(sberman): all logs are at Info level right now. Adjust appropriately. + // TODO(sjberman): all logs are at Info level right now. Adjust appropriately. logger logr.Logger } From 4e415c49bf75d05173173bdb4d106037d7341a5c Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Mon, 6 Jan 2025 13:18:46 -0700 Subject: [PATCH 4/4] Revert name change --- internal/mode/static/nginx/agent/command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index 4f010e3dee..9eabd8680e 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -18,7 +18,7 @@ import ( type commandService struct { pb.CommandServiceServer connTracker *agentgrpc.ConnectionsTracker - // TODO(sjberman): all logs are at Info level right now. Adjust appropriately. + // TODO(sberman): all logs are at Info level right now. Adjust appropriately. logger logr.Logger }