diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index af510dadfc..3114bea746 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -109,23 +109,21 @@ type objectFilter struct { // (3) Updating control plane configuration. // (4) Tracks the NGINX Plus usage reporting Secret (if applicable). type eventHandlerImpl struct { - // latestConfiguration is the latest Configuration generation. - latestConfiguration *dataplane.Configuration + // latestConfigurations are the latest Configuration generation for each Gateway tree. + latestConfigurations map[types.NamespacedName]*dataplane.Configuration // objectFilters contains all created objectFilters, with the key being a filterKey objectFilters map[filterKey]objectFilter cfg eventHandlerConfig lock sync.Mutex - - // version is the current version number of the nginx config. - version int } // newEventHandlerImpl creates a new eventHandlerImpl. func newEventHandlerImpl(cfg eventHandlerConfig) *eventHandlerImpl { handler := &eventHandlerImpl{ - cfg: cfg, + cfg: cfg, + latestConfigurations: make(map[types.NamespacedName]*dataplane.Configuration), } handler.objectFilters = map[filterKey]objectFilter{ @@ -158,28 +156,23 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log h.parseAndCaptureEvent(ctx, logger, event) } - changeType, gr := h.cfg.processor.Process() + gr := h.cfg.processor.Process() // Once we've processed resources on startup and built our first graph, mark the Pod as ready. if !h.cfg.graphBuiltHealthChecker.ready { h.cfg.graphBuiltHealthChecker.setAsReady() } - h.sendNginxConfig(ctx, logger, gr, changeType) + h.sendNginxConfig(ctx, logger, gr) } // enable is called when the pod becomes leader to ensure the provisioner has // the latest configuration. func (h *eventHandlerImpl) enable(ctx context.Context) { - h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph(), state.ClusterStateChange) + h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph()) } -func (h *eventHandlerImpl) sendNginxConfig( - ctx context.Context, - logger logr.Logger, - gr *graph.Graph, - changeType state.ChangeType, -) { +func (h *eventHandlerImpl) sendNginxConfig(ctx context.Context, logger logr.Logger, gr *graph.Graph) { if gr == nil { return } @@ -215,68 +208,30 @@ func (h *eventHandlerImpl) sendNginxConfig( panic("expected deployment, got nil") } - configApplied := h.processStateAndBuildConfig(ctx, logger, gr, gw, changeType, deployment) - - configErr := deployment.GetLatestConfigError() - upstreamErr := deployment.GetLatestUpstreamError() - err := errors.Join(configErr, upstreamErr) - - if configApplied || err != nil { - obj := &status.QueueObject{ - UpdateType: status.UpdateAll, - Error: err, - Deployment: gw.DeploymentName, - } - h.cfg.statusQueue.Enqueue(obj) - } - } -} - -func (h *eventHandlerImpl) processStateAndBuildConfig( - ctx context.Context, - logger logr.Logger, - gr *graph.Graph, - currentGateway *graph.Gateway, - changeType state.ChangeType, - deployment *agent.Deployment, -) bool { - var configApplied bool - switch changeType { - case state.EndpointsOnlyChange: - h.version++ - cfg := dataplane.BuildConfiguration(ctx, gr, currentGateway, h.cfg.serviceResolver, h.version, h.cfg.plus) + cfg := dataplane.BuildConfiguration(ctx, gr, gw, h.cfg.serviceResolver, h.cfg.plus) depCtx, getErr := h.getDeploymentContext(ctx) if getErr != nil { logger.Error(getErr, "error getting deployment context for usage reporting") } cfg.DeploymentContext = depCtx - h.setLatestConfiguration(&cfg) + h.setLatestConfiguration(gw, &cfg) deployment.FileLock.Lock() - if h.cfg.plus { - configApplied = h.cfg.nginxUpdater.UpdateUpstreamServers(deployment, cfg) - } else { - configApplied = h.updateNginxConf(deployment, cfg) - } + h.updateNginxConf(deployment, cfg) deployment.FileLock.Unlock() - case state.ClusterStateChange: - h.version++ - cfg := dataplane.BuildConfiguration(ctx, gr, currentGateway, h.cfg.serviceResolver, h.version, h.cfg.plus) - depCtx, getErr := h.getDeploymentContext(ctx) - if getErr != nil { - logger.Error(getErr, "error getting deployment context for usage reporting") - } - cfg.DeploymentContext = depCtx - h.setLatestConfiguration(&cfg) + configErr := deployment.GetLatestConfigError() + upstreamErr := deployment.GetLatestUpstreamError() + err := errors.Join(configErr, upstreamErr) - deployment.FileLock.Lock() - configApplied = h.updateNginxConf(deployment, cfg) - deployment.FileLock.Unlock() + obj := &status.QueueObject{ + UpdateType: status.UpdateAll, + Error: err, + Deployment: gw.DeploymentName, + } + h.cfg.statusQueue.Enqueue(obj) } - - return configApplied } func (h *eventHandlerImpl) waitForStatusUpdates(ctx context.Context) { @@ -451,16 +406,14 @@ func (h *eventHandlerImpl) parseAndCaptureEvent(ctx context.Context, logger logr func (h *eventHandlerImpl) updateNginxConf( deployment *agent.Deployment, conf dataplane.Configuration, -) bool { +) { files := h.cfg.generator.Generate(conf) - applied := h.cfg.nginxUpdater.UpdateConfig(deployment, files) + h.cfg.nginxUpdater.UpdateConfig(deployment, files) // If using NGINX Plus, update upstream servers using the API. if h.cfg.plus { h.cfg.nginxUpdater.UpdateUpstreamServers(deployment, conf) } - - return applied } // updateControlPlaneAndSetStatus updates the control plane configuration and then sets the status @@ -570,21 +523,28 @@ func (h *eventHandlerImpl) getDeploymentContext(ctx context.Context) (dataplane. } // GetLatestConfiguration gets the latest configuration. -func (h *eventHandlerImpl) GetLatestConfiguration() *dataplane.Configuration { +func (h *eventHandlerImpl) GetLatestConfiguration() []*dataplane.Configuration { h.lock.Lock() defer h.lock.Unlock() - return h.latestConfiguration + configs := make([]*dataplane.Configuration, 0, len(h.latestConfigurations)) + for _, cfg := range h.latestConfigurations { + configs = append(configs, cfg) + } + + return configs } // setLatestConfiguration sets the latest configuration. -// TODO(sberman): once we support multiple Gateways, this will likely have to be a map -// of all configurations. -func (h *eventHandlerImpl) setLatestConfiguration(cfg *dataplane.Configuration) { +func (h *eventHandlerImpl) setLatestConfiguration(gateway *graph.Gateway, cfg *dataplane.Configuration) { + if gateway == nil || gateway.Source == nil { + return + } + h.lock.Lock() defer h.lock.Unlock() - h.latestConfiguration = cfg + h.latestConfigurations[client.ObjectKeyFromObject(gateway.Source)] = cfg } func objectFilterKey(obj client.Object, nsName types.NamespacedName) filterKey { diff --git a/internal/mode/static/handler_test.go b/internal/mode/static/handler_test.go index ec5bfa437d..df5ee9d70a 100644 --- a/internal/mode/static/handler_test.go +++ b/internal/mode/static/handler_test.go @@ -31,7 +31,6 @@ import ( agentgrpcfakes "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/grpcfakes" "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/config/configfakes" "github.com/nginx/nginx-gateway-fabric/internal/mode/static/provisioner/provisionerfakes" - "github.com/nginx/nginx-gateway-fabric/internal/mode/static/state" "github.com/nginx/nginx-gateway-fabric/internal/mode/static/state/dataplane" "github.com/nginx/nginx-gateway-fabric/internal/mode/static/state/graph" "github.com/nginx/nginx-gateway-fabric/internal/mode/static/state/statefakes" @@ -99,11 +98,10 @@ var _ = Describe("eventHandler", func() { } fakeProcessor = &statefakes.FakeChangeProcessor{} - fakeProcessor.ProcessReturns(state.NoChange, &graph.Graph{}) + fakeProcessor.ProcessReturns(&graph.Graph{}) fakeProcessor.GetLatestGraphReturns(baseGraph) fakeGenerator = &configfakes.FakeGenerator{} fakeNginxUpdater = &agentfakes.FakeNginxUpdater{} - fakeNginxUpdater.UpdateConfigReturns(true) fakeProvisioner = &provisionerfakes.FakeProvisioner{} fakeProvisioner.RegisterGatewayReturns(nil) fakeStatusUpdater = &statusfakes.FakeGroupUpdater{} @@ -163,7 +161,7 @@ var _ = Describe("eventHandler", func() { } BeforeEach(func() { - fakeProcessor.ProcessReturns(state.ClusterStateChange, baseGraph) + fakeProcessor.ProcessReturns(baseGraph) fakeGenerator.GenerateReturns(fakeCfgFiles) }) @@ -178,11 +176,13 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1, &graph.Gateway{}) + dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, &graph.Gateway{}) checkUpsertEventExpectations(e) expectReconfig(dcfg, fakeCfgFiles) - Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty()) + config := handler.GetLatestConfiguration() + Expect(config).To(HaveLen(1)) + Expect(helpers.Diff(config[0], &dcfg)).To(BeEmpty()) }) It("should process Delete", func() { e := &events.DeleteEvent{ @@ -193,15 +193,17 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1, &graph.Gateway{}) + dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, &graph.Gateway{}) checkDeleteEventExpectations(e) expectReconfig(dcfg, fakeCfgFiles) - Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty()) + config := handler.GetLatestConfiguration() + Expect(config).To(HaveLen(1)) + Expect(helpers.Diff(config[0], &dcfg)).To(BeEmpty()) }) It("should not build anything if Gateway isn't set", func() { - fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{}) + fakeProcessor.ProcessReturns(&graph.Graph{}) e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}} batch := []interface{}{e} @@ -218,7 +220,7 @@ var _ = Describe("eventHandler", func() { }).Should(Equal(1)) }) It("should not build anything if graph is nil", func() { - fakeProcessor.ProcessReturns(state.ClusterStateChange, nil) + fakeProcessor.ProcessReturns(nil) e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}} batch := []interface{}{e} @@ -235,7 +237,7 @@ var _ = Describe("eventHandler", func() { }).Should(Equal(0)) }) It("should update gateway class even if gateway is invalid", func() { - fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{ + fakeProcessor.ProcessReturns(&graph.Graph{ Gateways: map[types.NamespacedName]*graph.Gateway{ {Namespace: "test", Name: "gateway"}: { Valid: false, @@ -273,8 +275,11 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 2, &graph.Gateway{}) - Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty()) + dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, &graph.Gateway{}) + + config := handler.GetLatestConfiguration() + Expect(config).To(HaveLen(1)) + Expect(helpers.Diff(config[0], &dcfg)).To(BeEmpty()) }) }) }) @@ -298,7 +303,7 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevelError)}} handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(handler.GetLatestConfiguration()).To(BeEmpty()) Eventually( func() int { @@ -317,7 +322,7 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevel("invalid"))}} handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(handler.GetLatestConfiguration()).To(BeEmpty()) Eventually( func() int { @@ -349,7 +354,7 @@ var _ = Describe("eventHandler", func() { } handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(handler.GetLatestConfiguration()).To(BeEmpty()) Eventually( func() int { @@ -367,7 +372,7 @@ var _ = Describe("eventHandler", func() { }) }) - When("receiving an EndpointsOnlyChange update", func() { + Context("NGINX Plus API calls", func() { e := &events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: "nginx-gateway", @@ -377,9 +382,17 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{e} BeforeEach(func() { - fakeProcessor.ProcessReturns(state.EndpointsOnlyChange, &graph.Graph{ + fakeProcessor.ProcessReturns(&graph.Graph{ Gateways: map[types.NamespacedName]*graph.Gateway{ - {}: {Valid: true}, + {}: { + Source: &gatewayv1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "gateway", + }, + }, + Valid: true, + }, }, }) }) @@ -390,11 +403,14 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1, &graph.Gateway{}) + dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, &graph.Gateway{}) dcfg.NginxPlus = dataplane.NginxPlus{AllowedAddresses: []string{"127.0.0.1"}} - Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty()) - Expect(fakeGenerator.GenerateCallCount()).To(Equal(0)) + config := handler.GetLatestConfiguration() + Expect(config).To(HaveLen(1)) + Expect(helpers.Diff(config[0], &dcfg)).To(BeEmpty()) + + Expect(fakeGenerator.GenerateCallCount()).To(Equal(1)) Expect(fakeNginxUpdater.UpdateUpstreamServersCallCount()).To(Equal(1)) }) }) @@ -403,8 +419,11 @@ var _ = Describe("eventHandler", func() { It("should not call the NGINX Plus API", func() { handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1, &graph.Gateway{}) - Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty()) + dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, &graph.Gateway{}) + + config := handler.GetLatestConfiguration() + Expect(config).To(HaveLen(1)) + Expect(helpers.Diff(config[0], &dcfg)).To(BeEmpty()) Expect(fakeGenerator.GenerateCallCount()).To(Equal(1)) Expect(fakeNginxUpdater.UpdateConfigCallCount()).To(Equal(1)) @@ -456,17 +475,27 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{e} readyChannel := handler.cfg.graphBuiltHealthChecker.getReadyCh() - fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{ + fakeProcessor.ProcessReturns(&graph.Graph{ Gateways: map[types.NamespacedName]*graph.Gateway{ - {}: {Valid: true}, + {}: { + Source: &gatewayv1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "gateway", + }, + }, + Valid: true, + }, }, }) Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed()) handler.HandleEventBatch(context.Background(), logr.Discard(), batch) - dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1, &graph.Gateway{}) - Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty()) + dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, &graph.Gateway{}) + config := handler.GetLatestConfiguration() + Expect(config).To(HaveLen(1)) + Expect(helpers.Diff(config[0], &dcfg)).To(BeEmpty()) Expect(readyChannel).To(BeClosed()) @@ -483,7 +512,7 @@ var _ = Describe("eventHandler", func() { Expect(handle).Should(Panic()) - Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(handler.GetLatestConfiguration()).To(BeEmpty()) }) }) diff --git a/internal/mode/static/nginx/agent/action.go b/internal/mode/static/nginx/agent/action.go new file mode 100644 index 0000000000..575cbf055b --- /dev/null +++ b/internal/mode/static/nginx/agent/action.go @@ -0,0 +1,92 @@ +package agent + +import ( + pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "google.golang.org/protobuf/types/known/structpb" +) + +func actionsEqual(a, b []*pb.NGINXPlusAction) bool { + if len(a) != len(b) { + return false + } + + for i := range a { + switch actionA := a[i].Action.(type) { + case *pb.NGINXPlusAction_UpdateHttpUpstreamServers: + actionB, ok := b[i].Action.(*pb.NGINXPlusAction_UpdateHttpUpstreamServers) + if !ok || !httpUpstreamsEqual(actionA.UpdateHttpUpstreamServers, actionB.UpdateHttpUpstreamServers) { + return false + } + case *pb.NGINXPlusAction_UpdateStreamServers: + actionB, ok := b[i].Action.(*pb.NGINXPlusAction_UpdateStreamServers) + if !ok || !streamUpstreamsEqual(actionA.UpdateStreamServers, actionB.UpdateStreamServers) { + return false + } + default: + return false + } + } + + return true +} + +func httpUpstreamsEqual(a, b *pb.UpdateHTTPUpstreamServers) bool { + if a.HttpUpstreamName != b.HttpUpstreamName { + return false + } + + if len(a.Servers) != len(b.Servers) { + return false + } + + for i := range a.Servers { + if !structsEqual(a.Servers[i], b.Servers[i]) { + return false + } + } + + return true +} + +func streamUpstreamsEqual(a, b *pb.UpdateStreamServers) bool { + if a.UpstreamStreamName != b.UpstreamStreamName { + return false + } + + if len(a.Servers) != len(b.Servers) { + return false + } + + for i := range a.Servers { + if !structsEqual(a.Servers[i], b.Servers[i]) { + return false + } + } + + return true +} + +func structsEqual(a, b *structpb.Struct) bool { + if len(a.Fields) != len(b.Fields) { + return false + } + + for key, valueA := range a.Fields { + valueB, exists := b.Fields[key] + if !exists || !valuesEqual(valueA, valueB) { + return false + } + } + + return true +} + +func valuesEqual(a, b *structpb.Value) bool { + switch valueA := a.Kind.(type) { + case *structpb.Value_StringValue: + valueB, ok := b.Kind.(*structpb.Value_StringValue) + return ok && valueA.StringValue == valueB.StringValue + default: + return false + } +} diff --git a/internal/mode/static/nginx/agent/action_test.go b/internal/mode/static/nginx/agent/action_test.go new file mode 100644 index 0000000000..491dbc0dd8 --- /dev/null +++ b/internal/mode/static/nginx/agent/action_test.go @@ -0,0 +1,347 @@ +package agent + +import ( + "testing" + + pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" + . "github.com/onsi/gomega" + "google.golang.org/protobuf/types/known/structpb" +) + +func TestActionsEqual(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + actionA []*pb.NGINXPlusAction + actionB []*pb.NGINXPlusAction + expected bool + }{ + { + name: "Actions are equal", + actionA: []*pb.NGINXPlusAction{ + { + Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{ + UpdateHttpUpstreamServers: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + }, + }, + }, + }, + }, + actionB: []*pb.NGINXPlusAction{ + { + Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{ + UpdateHttpUpstreamServers: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "Actions have different types", + actionA: []*pb.NGINXPlusAction{ + { + Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{ + UpdateHttpUpstreamServers: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + }, + }, + }, + }, + actionB: []*pb.NGINXPlusAction{ + { + Action: &pb.NGINXPlusAction_UpdateStreamServers{ + UpdateStreamServers: &pb.UpdateStreamServers{ + UpstreamStreamName: "upstream1", + }, + }, + }, + }, + expected: false, + }, + { + name: "Actions have different values", + actionA: []*pb.NGINXPlusAction{ + { + Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{ + UpdateHttpUpstreamServers: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value1"}}}}, + }, + }, + }, + }, + }, + actionB: []*pb.NGINXPlusAction{ + { + Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{ + UpdateHttpUpstreamServers: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value2"}}}}, + }, + }, + }, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + g.Expect(actionsEqual(tt.actionA, tt.actionB)).To(Equal(tt.expected)) + }) + } +} + +func TestHttpUpstreamsEqual(t *testing.T) { + t.Parallel() + + tests := []struct { + upstreamA *pb.UpdateHTTPUpstreamServers + upstreamB *pb.UpdateHTTPUpstreamServers + name string + expected bool + }{ + { + name: "HTTP upstreams are equal", + upstreamA: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + }, + }, + upstreamB: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + }, + }, + expected: true, + }, + { + name: "HTTP upstreams have different upstream names", + upstreamA: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + }, + upstreamB: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream2", + }, + expected: false, + }, + { + name: "HTTP upstreams have different server lengths", + upstreamA: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + }, + }, + upstreamB: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + {Fields: map[string]*structpb.Value{"key2": {Kind: &structpb.Value_StringValue{StringValue: "value2"}}}}, + }, + }, + expected: false, + }, + { + name: "HTTP upstreams have different server contents", + upstreamA: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value1"}}}}, + }, + }, + upstreamB: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "upstream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value2"}}}}, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + g.Expect(httpUpstreamsEqual(tt.upstreamA, tt.upstreamB)).To(Equal(tt.expected)) + }) + } +} + +func TestStreamUpstreamsEqual(t *testing.T) { + t.Parallel() + + tests := []struct { + upstreamA *pb.UpdateStreamServers + upstreamB *pb.UpdateStreamServers + name string + expected bool + }{ + { + name: "Stream upstreams are equal", + upstreamA: &pb.UpdateStreamServers{ + UpstreamStreamName: "stream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + }, + }, + upstreamB: &pb.UpdateStreamServers{ + UpstreamStreamName: "stream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + }, + }, + expected: true, + }, + { + name: "Stream have different upstream names", + upstreamA: &pb.UpdateStreamServers{ + UpstreamStreamName: "stream1", + }, + upstreamB: &pb.UpdateStreamServers{ + UpstreamStreamName: "stream2", + }, + expected: false, + }, + { + name: "Stream upstreams have different server lengths", + upstreamA: &pb.UpdateStreamServers{ + UpstreamStreamName: "stream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + }, + }, + upstreamB: &pb.UpdateStreamServers{ + UpstreamStreamName: "stream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}}, + {Fields: map[string]*structpb.Value{"key2": {Kind: &structpb.Value_StringValue{StringValue: "value2"}}}}, + }, + }, + expected: false, + }, + { + name: "Stream upstreams have different server contents", + upstreamA: &pb.UpdateStreamServers{ + UpstreamStreamName: "stream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value1"}}}}, + }, + }, + upstreamB: &pb.UpdateStreamServers{ + UpstreamStreamName: "stream1", + Servers: []*structpb.Struct{ + {Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value2"}}}}, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + g.Expect(streamUpstreamsEqual(tt.upstreamA, tt.upstreamB)).To(Equal(tt.expected)) + }) + } +} + +func TestStructsEqual(t *testing.T) { + t.Parallel() + + tests := []struct { + structA *structpb.Struct + structB *structpb.Struct + name string + expected bool + }{ + { + name: "Structs are equal", + structA: &structpb.Struct{ + Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}, + }, + structB: &structpb.Struct{ + Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}, + }, + expected: true, + }, + { + name: "Structs have different values", + structA: &structpb.Struct{ + Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}, + }, + structB: &structpb.Struct{ + Fields: map[string]*structpb.Value{"key": {Kind: &structpb.Value_StringValue{StringValue: "different"}}}, + }, + expected: false, + }, + { + name: "Structs have different keys", + structA: &structpb.Struct{ + Fields: map[string]*structpb.Value{"key1": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}, + }, + structB: &structpb.Struct{ + Fields: map[string]*structpb.Value{"key2": {Kind: &structpb.Value_StringValue{StringValue: "value"}}}, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + g.Expect(structsEqual(tt.structA, tt.structB)).To(Equal(tt.expected)) + }) + } +} + +func TestValuesEqual(t *testing.T) { + t.Parallel() + + tests := []struct { + valueA *structpb.Value + valueB *structpb.Value + name string + expected bool + }{ + { + name: "Values are equal", + valueA: &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "value"}}, + valueB: &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "value"}}, + expected: true, + }, + { + name: "Values are not equal", + valueA: &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "value"}}, + valueB: &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "different"}}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + g.Expect(valuesEqual(tt.valueA, tt.valueB)).To(Equal(tt.expected)) + }) + } +} diff --git a/internal/mode/static/nginx/agent/agent.go b/internal/mode/static/nginx/agent/agent.go index 7bc0818214..dbe49deb0c 100644 --- a/internal/mode/static/nginx/agent/agent.go +++ b/internal/mode/static/nginx/agent/agent.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sort" "time" "github.com/go-logr/logr" @@ -27,8 +28,8 @@ const retryUpstreamTimeout = 5 * time.Second // NginxUpdater is an interface for updating NGINX using the NGINX agent. type NginxUpdater interface { - UpdateConfig(deployment *Deployment, files []File) bool - UpdateUpstreamServers(deployment *Deployment, conf dataplane.Configuration) bool + UpdateConfig(deployment *Deployment, files []File) + UpdateUpstreamServers(deployment *Deployment, conf dataplane.Configuration) } // NginxUpdaterImpl implements the NginxUpdater interface. @@ -73,7 +74,6 @@ func NewNginxUpdater( } // UpdateConfig sends the nginx configuration to the agent. -// Returns whether the configuration was sent to any agents. // // The flow of events is as follows: // - Set the configuration files on the deployment. @@ -86,27 +86,28 @@ func NewNginxUpdater( func (n *NginxUpdaterImpl) UpdateConfig( deployment *Deployment, files []File, -) bool { +) { msg := deployment.SetFiles(files) - applied := deployment.GetBroadcaster().Send(msg) + if msg == nil { + return + } + + applied := deployment.GetBroadcaster().Send(*msg) if applied { n.logger.Info("Sent nginx configuration to agent") } deployment.SetLatestConfigError(deployment.GetConfigurationStatus()) - - return applied } // UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API. // Only applicable when using NGINX Plus. -// Returns whether the configuration was sent to any agents. func (n *NginxUpdaterImpl) UpdateUpstreamServers( deployment *Deployment, conf dataplane.Configuration, -) bool { +) { if !n.plus { - return false + return } broadcaster := deployment.GetBroadcaster() @@ -114,12 +115,6 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers( // reset the latest error to nil now that we're applying new config deployment.SetLatestUpstreamError(nil) - // TODO(sberman): optimize this by only sending updates that are necessary. - // Call GetUpstreams first (will need Subscribers to send responses back), and - // then determine which upstreams actually need to be updated. - // OR we can possibly just use the most recent NGINXPlusActions to see what the last state - // of upstreams were, and only update the diff. - var errs []error var applied bool actions := make([]*pb.NGINXPlusAction, 0, len(conf.Upstreams)+len(conf.StreamUpstreams)) @@ -141,6 +136,10 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers( actions = append(actions, action) } + if actionsEqual(deployment.GetNGINXPlusActions(), actions) { + return + } + for _, action := range actions { msg := broadcast.NginxAgentMessage{ Type: broadcast.APIRequest, @@ -163,8 +162,6 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers( // Store the most recent actions on the deployment so any new subscribers can apply them when first connecting. deployment.SetNGINXPlusActions(actions) - - return applied } func buildHTTPUpstreamServers(upstream dataplane.Upstream) *pb.UpdateHTTPUpstreamServers { @@ -197,6 +194,11 @@ func buildUpstreamServers(upstream dataplane.Upstream) []*structpb.Struct { servers = append(servers, server) } + // sort the servers to avoid unnecessary reloads + sort.Slice(servers, func(i, j int) bool { + return servers[i].Fields["server"].GetStringValue() < servers[j].Fields["server"].GetStringValue() + }) + return servers } diff --git a/internal/mode/static/nginx/agent/agent_test.go b/internal/mode/static/nginx/agent/agent_test.go index 3266003981..b0147d4d96 100644 --- a/internal/mode/static/nginx/agent/agent_test.go +++ b/internal/mode/static/nginx/agent/agent_test.go @@ -21,24 +21,16 @@ func TestUpdateConfig(t *testing.T) { t.Parallel() tests := []struct { - name string - configApplied bool - expErr bool + name string + expErr bool }{ { - name: "success", - configApplied: true, - expErr: false, + name: "success", + expErr: false, }, { - name: "error returned from agent", - configApplied: true, - expErr: true, - }, - { - name: "configuration not applied", - configApplied: false, - expErr: false, + name: "error returned from agent", + expErr: true, }, } @@ -48,7 +40,7 @@ func TestUpdateConfig(t *testing.T) { g := NewWithT(t) fakeBroadcaster := &broadcastfakes.FakeBroadcaster{} - fakeBroadcaster.SendReturns(test.configApplied) + fakeBroadcaster.SendReturns(true) plus := false updater := NewNginxUpdater(logr.Discard(), fake.NewFakeClient(), &status.Queue{}, nil, plus) @@ -70,15 +62,16 @@ func TestUpdateConfig(t *testing.T) { deployment.SetPodErrorStatus("pod1", testErr) } - applied := updater.UpdateConfig(deployment, []File{file}) + updater.UpdateConfig(deployment, []File{file}) - g.Expect(applied).To(Equal(test.configApplied)) + g.Expect(fakeBroadcaster.SendCallCount()).To(Equal(1)) g.Expect(deployment.GetFile(file.Meta.Name, file.Meta.Hash)).To(Equal(file.Contents)) if test.expErr { g.Expect(deployment.GetLatestConfigError()).To(Equal(testErr)) // ensure that the error is cleared after the next config is applied deployment.SetPodErrorStatus("pod1", nil) + file.Meta.Hash = "5678" updater.UpdateConfig(deployment, []File{file}) g.Expect(deployment.GetLatestConfigError()).ToNot(HaveOccurred()) } else { @@ -88,6 +81,37 @@ func TestUpdateConfig(t *testing.T) { } } +func TestUpdateConfig_NoChange(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + fakeBroadcaster := &broadcastfakes.FakeBroadcaster{} + + updater := NewNginxUpdater(logr.Discard(), fake.NewFakeClient(), &status.Queue{}, nil, false) + + deployment := &Deployment{ + broadcaster: fakeBroadcaster, + podStatuses: make(map[string]error), + } + + file := File{ + Meta: &pb.FileMeta{ + Name: "test.conf", + Hash: "12345", + }, + Contents: []byte("test content"), + } + + // Set the initial files on the deployment + deployment.SetFiles([]File{file}) + + // Call UpdateConfig with the same files + updater.UpdateConfig(deployment, []File{file}) + + // Verify that no new configuration was sent + g.Expect(fakeBroadcaster.SendCallCount()).To(Equal(0)) +} + func TestUpdateUpstreamServers(t *testing.T) { t.Parallel() @@ -95,43 +119,31 @@ func TestUpdateUpstreamServers(t *testing.T) { name string buildUpstreams bool plus bool - configApplied bool expErr bool }{ { name: "success", plus: true, buildUpstreams: true, - configApplied: true, expErr: false, }, { name: "no upstreams to apply", plus: true, buildUpstreams: false, - configApplied: false, expErr: false, }, { - name: "not running nginx plus", - plus: false, - configApplied: false, - expErr: false, + name: "not running nginx plus", + plus: false, + expErr: false, }, { name: "error returned from agent", plus: true, buildUpstreams: true, - configApplied: true, expErr: true, }, - { - name: "configuration not applied", - plus: true, - buildUpstreams: true, - configApplied: false, - expErr: false, - }, } for _, test := range tests { @@ -140,7 +152,6 @@ func TestUpdateUpstreamServers(t *testing.T) { g := NewWithT(t) fakeBroadcaster := &broadcastfakes.FakeBroadcaster{} - fakeBroadcaster.SendReturns(test.configApplied) updater := NewNginxUpdater(logr.Discard(), fake.NewFakeClient(), &status.Queue{}, nil, test.plus) updater.retryTimeout = 0 @@ -182,8 +193,7 @@ func TestUpdateUpstreamServers(t *testing.T) { } } - applied := updater.UpdateUpstreamServers(deployment, conf) - g.Expect(applied).To(Equal(test.configApplied)) + updater.UpdateUpstreamServers(deployment, conf) expActions := make([]*pb.NGINXPlusAction, 0) if test.buildUpstreams { @@ -221,8 +231,10 @@ func TestUpdateUpstreamServers(t *testing.T) { if !test.plus { g.Expect(deployment.GetNGINXPlusActions()).To(BeNil()) - } else { + g.Expect(fakeBroadcaster.SendCallCount()).To(Equal(0)) + } else if test.buildUpstreams { g.Expect(deployment.GetNGINXPlusActions()).To(Equal(expActions)) + g.Expect(fakeBroadcaster.SendCallCount()).To(Equal(2)) } if test.expErr { @@ -243,6 +255,83 @@ func TestUpdateUpstreamServers(t *testing.T) { } } +func TestUpdateUpstreamServers_NoChange(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + fakeBroadcaster := &broadcastfakes.FakeBroadcaster{} + + updater := NewNginxUpdater(logr.Discard(), fake.NewFakeClient(), &status.Queue{}, nil, true) + updater.retryTimeout = 0 + + deployment := &Deployment{ + broadcaster: fakeBroadcaster, + podStatuses: make(map[string]error), + } + + conf := dataplane.Configuration{ + Upstreams: []dataplane.Upstream{ + { + Name: "test-upstream", + Endpoints: []resolver.Endpoint{ + { + Address: "1.2.3.4", + Port: 8080, + }, + }, + }, + }, + StreamUpstreams: []dataplane.Upstream{ + { + Name: "test-stream-upstream", + Endpoints: []resolver.Endpoint{ + { + Address: "5.6.7.8", + }, + }, + }, + }, + } + + initialActions := []*pb.NGINXPlusAction{ + { + Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{ + UpdateHttpUpstreamServers: &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: "test-upstream", + Servers: []*structpb.Struct{ + { + Fields: map[string]*structpb.Value{ + "server": structpb.NewStringValue("1.2.3.4:8080"), + }, + }, + }, + }, + }, + }, + { + Action: &pb.NGINXPlusAction_UpdateStreamServers{ + UpdateStreamServers: &pb.UpdateStreamServers{ + UpstreamStreamName: "test-stream-upstream", + Servers: []*structpb.Struct{ + { + Fields: map[string]*structpb.Value{ + "server": structpb.NewStringValue("5.6.7.8"), + }, + }, + }, + }, + }, + }, + } + deployment.SetNGINXPlusActions(initialActions) + + // Call UpdateUpstreamServers with the same configuration + updater.UpdateUpstreamServers(deployment, conf) + + // Verify that no new actions were sent + g.Expect(fakeBroadcaster.SendCallCount()).To(Equal(0)) +} + func TestGetPortAndIPFormat(t *testing.T) { t.Parallel() diff --git a/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go b/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go index 6c3165e5b6..f69009ce04 100644 --- a/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go +++ b/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go @@ -9,57 +9,39 @@ import ( ) type FakeNginxUpdater struct { - UpdateConfigStub func(*agent.Deployment, []agent.File) bool + UpdateConfigStub func(*agent.Deployment, []agent.File) updateConfigMutex sync.RWMutex updateConfigArgsForCall []struct { arg1 *agent.Deployment arg2 []agent.File } - updateConfigReturns struct { - result1 bool - } - updateConfigReturnsOnCall map[int]struct { - result1 bool - } - UpdateUpstreamServersStub func(*agent.Deployment, dataplane.Configuration) bool + UpdateUpstreamServersStub func(*agent.Deployment, dataplane.Configuration) updateUpstreamServersMutex sync.RWMutex updateUpstreamServersArgsForCall []struct { arg1 *agent.Deployment arg2 dataplane.Configuration } - updateUpstreamServersReturns struct { - result1 bool - } - updateUpstreamServersReturnsOnCall map[int]struct { - result1 bool - } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *FakeNginxUpdater) UpdateConfig(arg1 *agent.Deployment, arg2 []agent.File) bool { +func (fake *FakeNginxUpdater) UpdateConfig(arg1 *agent.Deployment, arg2 []agent.File) { var arg2Copy []agent.File if arg2 != nil { arg2Copy = make([]agent.File, len(arg2)) copy(arg2Copy, arg2) } fake.updateConfigMutex.Lock() - ret, specificReturn := fake.updateConfigReturnsOnCall[len(fake.updateConfigArgsForCall)] fake.updateConfigArgsForCall = append(fake.updateConfigArgsForCall, struct { arg1 *agent.Deployment arg2 []agent.File }{arg1, arg2Copy}) stub := fake.UpdateConfigStub - fakeReturns := fake.updateConfigReturns fake.recordInvocation("UpdateConfig", []interface{}{arg1, arg2Copy}) fake.updateConfigMutex.Unlock() if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1 + fake.UpdateConfigStub(arg1, arg2) } - return fakeReturns.result1 } func (fake *FakeNginxUpdater) UpdateConfigCallCount() int { @@ -68,7 +50,7 @@ func (fake *FakeNginxUpdater) UpdateConfigCallCount() int { return len(fake.updateConfigArgsForCall) } -func (fake *FakeNginxUpdater) UpdateConfigCalls(stub func(*agent.Deployment, []agent.File) bool) { +func (fake *FakeNginxUpdater) UpdateConfigCalls(stub func(*agent.Deployment, []agent.File)) { fake.updateConfigMutex.Lock() defer fake.updateConfigMutex.Unlock() fake.UpdateConfigStub = stub @@ -81,47 +63,18 @@ func (fake *FakeNginxUpdater) UpdateConfigArgsForCall(i int) (*agent.Deployment, return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeNginxUpdater) UpdateConfigReturns(result1 bool) { - fake.updateConfigMutex.Lock() - defer fake.updateConfigMutex.Unlock() - fake.UpdateConfigStub = nil - fake.updateConfigReturns = struct { - result1 bool - }{result1} -} - -func (fake *FakeNginxUpdater) UpdateConfigReturnsOnCall(i int, result1 bool) { - fake.updateConfigMutex.Lock() - defer fake.updateConfigMutex.Unlock() - fake.UpdateConfigStub = nil - if fake.updateConfigReturnsOnCall == nil { - fake.updateConfigReturnsOnCall = make(map[int]struct { - result1 bool - }) - } - fake.updateConfigReturnsOnCall[i] = struct { - result1 bool - }{result1} -} - -func (fake *FakeNginxUpdater) UpdateUpstreamServers(arg1 *agent.Deployment, arg2 dataplane.Configuration) bool { +func (fake *FakeNginxUpdater) UpdateUpstreamServers(arg1 *agent.Deployment, arg2 dataplane.Configuration) { fake.updateUpstreamServersMutex.Lock() - ret, specificReturn := fake.updateUpstreamServersReturnsOnCall[len(fake.updateUpstreamServersArgsForCall)] fake.updateUpstreamServersArgsForCall = append(fake.updateUpstreamServersArgsForCall, struct { arg1 *agent.Deployment arg2 dataplane.Configuration }{arg1, arg2}) stub := fake.UpdateUpstreamServersStub - fakeReturns := fake.updateUpstreamServersReturns fake.recordInvocation("UpdateUpstreamServers", []interface{}{arg1, arg2}) fake.updateUpstreamServersMutex.Unlock() if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1 + fake.UpdateUpstreamServersStub(arg1, arg2) } - return fakeReturns.result1 } func (fake *FakeNginxUpdater) UpdateUpstreamServersCallCount() int { @@ -130,7 +83,7 @@ func (fake *FakeNginxUpdater) UpdateUpstreamServersCallCount() int { return len(fake.updateUpstreamServersArgsForCall) } -func (fake *FakeNginxUpdater) UpdateUpstreamServersCalls(stub func(*agent.Deployment, dataplane.Configuration) bool) { +func (fake *FakeNginxUpdater) UpdateUpstreamServersCalls(stub func(*agent.Deployment, dataplane.Configuration)) { fake.updateUpstreamServersMutex.Lock() defer fake.updateUpstreamServersMutex.Unlock() fake.UpdateUpstreamServersStub = stub @@ -143,29 +96,6 @@ func (fake *FakeNginxUpdater) UpdateUpstreamServersArgsForCall(i int) (*agent.De return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeNginxUpdater) UpdateUpstreamServersReturns(result1 bool) { - fake.updateUpstreamServersMutex.Lock() - defer fake.updateUpstreamServersMutex.Unlock() - fake.UpdateUpstreamServersStub = nil - fake.updateUpstreamServersReturns = struct { - result1 bool - }{result1} -} - -func (fake *FakeNginxUpdater) UpdateUpstreamServersReturnsOnCall(i int, result1 bool) { - fake.updateUpstreamServersMutex.Lock() - defer fake.updateUpstreamServersMutex.Unlock() - fake.UpdateUpstreamServersStub = nil - if fake.updateUpstreamServersReturnsOnCall == nil { - fake.updateUpstreamServersReturnsOnCall = make(map[int]struct { - result1 bool - }) - } - fake.updateUpstreamServersReturnsOnCall[i] = struct { - result1 bool - }{result1} -} - func (fake *FakeNginxUpdater) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() diff --git a/internal/mode/static/nginx/agent/deployment.go b/internal/mode/static/nginx/agent/deployment.go index 5da82c7fd9..3aa8d80b6b 100644 --- a/internal/mode/static/nginx/agent/deployment.go +++ b/internal/mode/static/nginx/agent/deployment.go @@ -173,7 +173,7 @@ func (d *Deployment) GetFile(name, hash string) []byte { // SetFiles updates the nginx files and fileOverviews for the deployment and returns the message to send. // The deployment FileLock MUST already be locked before calling this function. -func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage { +func (d *Deployment) SetFiles(files []File) *broadcast.NginxAgentMessage { d.files = files fileOverviews := make([]*pb.File, 0, len(files)) @@ -194,10 +194,16 @@ func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage { }) } - d.configVersion = filesHelper.GenerateConfigVersion(fileOverviews) + newConfigVersion := filesHelper.GenerateConfigVersion(fileOverviews) + if d.configVersion == newConfigVersion { + // files have not changed, nothing to send + return nil + } + + d.configVersion = newConfigVersion d.fileOverviews = fileOverviews - return broadcast.NginxAgentMessage{ + return &broadcast.NginxAgentMessage{ Type: broadcast.ConfigApplyRequest, FileOverviews: fileOverviews, ConfigVersion: d.configVersion, diff --git a/internal/mode/static/nginx/agent/deployment_test.go b/internal/mode/static/nginx/agent/deployment_test.go index 3c6dc4c859..57d9510588 100644 --- a/internal/mode/static/nginx/agent/deployment_test.go +++ b/internal/mode/static/nginx/agent/deployment_test.go @@ -57,6 +57,13 @@ func TestSetAndGetFiles(t *testing.T) { g.Expect(deployment.GetFile("invalid", "12345")).To(BeNil()) g.Expect(deployment.GetFile("test.conf", "invalid")).To(BeNil()) + + // Set the same files again + msg = deployment.SetFiles(files) + g.Expect(msg).To(BeNil()) + + newFileOverviews, _ := deployment.GetFileOverviews() + g.Expect(newFileOverviews).To(Equal(fileOverviews)) } func TestSetNGINXPlusActions(t *testing.T) { diff --git a/internal/mode/static/provisioner/templates.go b/internal/mode/static/provisioner/templates.go index e58ee3abec..326cac7478 100644 --- a/internal/mode/static/provisioner/templates.go +++ b/internal/mode/static/provisioner/templates.go @@ -61,15 +61,15 @@ log: {{- if .EnableMetrics }} collector: receivers: - host_metrics: - collection_interval: 1m0s - initial_delay: 1s - scrapers: - cpu: {} - memory: {} - disk: {} - network: {} - filesystem: {} + host_metrics: + collection_interval: 1m0s + initial_delay: 1s + scrapers: + cpu: {} + memory: {} + disk: {} + network: {} + filesystem: {} processors: batch: {} exporters: diff --git a/internal/mode/static/state/change_processor.go b/internal/mode/static/state/change_processor.go index 1cd72f7612..1d136383b8 100644 --- a/internal/mode/static/state/change_processor.go +++ b/internal/mode/static/state/change_processor.go @@ -28,19 +28,6 @@ import ( //go:generate go tool counterfeiter -generate -// ChangeType is the type of change that occurred based on a k8s object event. -type ChangeType int - -const ( - // NoChange means that nothing changed. - NoChange ChangeType = iota - // EndpointsOnlyChange means that only the endpoints changed. - // If using NGINX Plus, this update can be done using the API without a reload. - EndpointsOnlyChange - // ClusterStateChange means that something other than endpoints changed. This requires an NGINX reload. - ClusterStateChange -) - //counterfeiter:generate . ChangeProcessor // ChangeProcessor processes the changes to resources and produces a graph-like representation @@ -55,8 +42,8 @@ type ChangeProcessor interface { // this ChangeProcessor was created for. CaptureDeleteChange(resourceType ngftypes.ObjectType, nsname types.NamespacedName) // Process produces a graph-like representation of GatewayAPI resources. - // If no changes were captured, the changed return argument will be NoChange and graph will be empty. - Process() (changeType ChangeType, graphCfg *graph.Graph) + // If no changes were captured, the graph will be empty. + Process() (graphCfg *graph.Graph) // GetLatestGraph returns the latest Graph. GetLatestGraph() *graph.Graph } @@ -88,7 +75,7 @@ type ChangeProcessorImpl struct { // updater acts upon the cluster state. updater Updater // getAndResetClusterStateChanged tells if and how the cluster state has changed. - getAndResetClusterStateChanged func() ChangeType + getAndResetClusterStateChanged func() bool cfg ChangeProcessorConfig lock sync.Mutex @@ -268,13 +255,12 @@ func (c *ChangeProcessorImpl) CaptureDeleteChange(resourceType ngftypes.ObjectTy c.updater.Delete(resourceType, nsname) } -func (c *ChangeProcessorImpl) Process() (ChangeType, *graph.Graph) { +func (c *ChangeProcessorImpl) Process() *graph.Graph { c.lock.Lock() defer c.lock.Unlock() - changeType := c.getAndResetClusterStateChanged() - if changeType == NoChange { - return NoChange, nil + if !c.getAndResetClusterStateChanged() { + return nil } c.latestGraph = graph.BuildGraph( @@ -285,7 +271,7 @@ func (c *ChangeProcessorImpl) Process() (ChangeType, *graph.Graph) { c.cfg.Validators, ) - return changeType, c.latestGraph + return c.latestGraph } func (c *ChangeProcessorImpl) GetLatestGraph() *graph.Graph { diff --git a/internal/mode/static/state/change_processor_test.go b/internal/mode/static/state/change_processor_test.go index 76055298f0..4797710fa3 100644 --- a/internal/mode/static/state/change_processor_test.go +++ b/internal/mode/static/state/change_processor_test.go @@ -400,6 +400,26 @@ var _ = Describe("ChangeProcessor", func() { processor state.ChangeProcessor ) + testUpsertTriggersChange := func(obj client.Object) { + processor.CaptureUpsertChange(obj) + Expect(processor.Process()).ToNot(BeNil()) + } + + testUpsertDoesNotTriggerChange := func(obj client.Object) { + processor.CaptureUpsertChange(obj) + Expect(processor.Process()).To(BeNil()) + } + + testDeleteTriggersChange := func(obj client.Object, nsname types.NamespacedName) { + processor.CaptureDeleteChange(obj, nsname) + Expect(processor.Process()).ToNot(BeNil()) + } + + testDeleteDoesNotTriggerChange := func(obj client.Object, nsname types.NamespacedName) { + processor.CaptureDeleteChange(obj, nsname) + Expect(processor.Process()).To(BeNil()) + } + BeforeEach(OncePerOrdered, func() { processor = state.NewChangeProcessorImpl(state.ChangeProcessorConfig{ GatewayCtlrName: controllerName, @@ -432,8 +452,7 @@ var _ = Describe("ChangeProcessor", func() { ) processAndValidateGraph := func(expGraph *graph.Graph) { - changed, graphCfg := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graphCfg := processor.Process() Expect(helpers.Diff(expGraph, graphCfg)).To(BeEmpty()) Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) } @@ -1208,8 +1227,7 @@ var _ = Describe("ChangeProcessor", func() { }) When("no upsert has occurred", func() { It("returns nil graph", func() { - changed, graphCfg := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + graphCfg := processor.Process() Expect(graphCfg).To(BeNil()) Expect(processor.GetLatestGraph()).To(BeNil()) }) @@ -1248,8 +1266,7 @@ var _ = Describe("ChangeProcessor", func() { It("returns nil graph", func() { processor.CaptureUpsertChange(diffNsTLSSecret) - changed, graphCfg := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + graphCfg := processor.Process() Expect(graphCfg).To(BeNil()) Expect(helpers.Diff(&graph.Graph{}, processor.GetLatestGraph())).To(BeEmpty()) }) @@ -1562,8 +1579,7 @@ var _ = Describe("ChangeProcessor", func() { gatewayclass.SupportedVersion, ) - changed, graphCfg := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + graphCfg := processor.Process() Expect(graphCfg).To(BeNil()) Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) @@ -1680,8 +1696,7 @@ var _ = Describe("ChangeProcessor", func() { CertBundle: diffNsTLSCert, } - changed, graphCfg := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + graphCfg := processor.Process() Expect(graphCfg).To(BeNil()) Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) @@ -1695,8 +1710,7 @@ var _ = Describe("ChangeProcessor", func() { CertBundle: diffNsTLSCert, } - changed, graphCfg := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + graphCfg := processor.Process() Expect(graphCfg).To(BeNil()) Expect(helpers.Diff(expGraph, processor.GetLatestGraph())).To(BeEmpty()) }) @@ -2411,58 +2425,43 @@ var _ = Describe("ChangeProcessor", func() { gw = createGateway("gw", createHTTPListener()) processor.CaptureUpsertChange(gc) processor.CaptureUpsertChange(gw) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + gr := processor.Process() + Expect(gr).ToNot(BeNil()) }) - testProcessChangedVal := func(expChanged state.ChangeType) { - changed, _ := processor.Process() - Expect(changed).To(Equal(expChanged)) - } - - testUpsertTriggersChange := func(obj client.Object, expChanged state.ChangeType) { - processor.CaptureUpsertChange(obj) - testProcessChangedVal(expChanged) - } - - testDeleteTriggersChange := func(obj client.Object, nsname types.NamespacedName, expChanged state.ChangeType) { - processor.CaptureDeleteChange(obj, nsname) - testProcessChangedVal(expChanged) - } - When("hr1 is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hr1, state.ClusterStateChange) + testUpsertTriggersChange(hr1) }) }) When("a hr1 service is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hr1svc, state.ClusterStateChange) + testUpsertTriggersChange(hr1svc) }) }) When("a backendTLSPolicy is added for referenced service", func() { It("should trigger a change", func() { - testUpsertTriggersChange(btls, state.ClusterStateChange) + testUpsertTriggersChange(btls) }) }) When("an hr1 endpoint slice is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hr1slice1, state.EndpointsOnlyChange) + testUpsertTriggersChange(hr1slice1) }) }) When("an hr1 service is updated", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hr1svc, state.ClusterStateChange) + testUpsertTriggersChange(hr1svc) }) }) When("another hr1 endpoint slice is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hr1slice2, state.EndpointsOnlyChange) + testUpsertTriggersChange(hr1slice2) }) }) When("an endpoint slice with a missing svc name label is added", func() { It("should not trigger a change", func() { - testUpsertTriggersChange(missingSvcNameSlice, state.NoChange) + testUpsertDoesNotTriggerChange(missingSvcNameSlice) }) }) When("an hr1 endpoint slice is deleted", func() { @@ -2470,7 +2469,6 @@ var _ = Describe("ChangeProcessor", func() { testDeleteTriggersChange( hr1slice1, types.NamespacedName{Namespace: hr1slice1.Namespace, Name: hr1slice1.Name}, - state.EndpointsOnlyChange, ) }) }) @@ -2479,13 +2477,12 @@ var _ = Describe("ChangeProcessor", func() { testDeleteTriggersChange( hr1slice2, types.NamespacedName{Namespace: hr1slice2.Namespace, Name: hr1slice2.Name}, - state.EndpointsOnlyChange, ) }) }) When("the second hr1 endpoint slice is recreated", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hr1slice2, state.EndpointsOnlyChange) + testUpsertTriggersChange(hr1slice2) }) }) When("hr1 is deleted", func() { @@ -2493,41 +2490,38 @@ var _ = Describe("ChangeProcessor", func() { testDeleteTriggersChange( hr1, types.NamespacedName{Namespace: hr1.Namespace, Name: hr1.Name}, - state.ClusterStateChange, ) }) }) When("hr1 service is deleted", func() { It("should not trigger a change", func() { - testDeleteTriggersChange( + testDeleteDoesNotTriggerChange( hr1svc, types.NamespacedName{Namespace: hr1svc.Namespace, Name: hr1svc.Name}, - state.NoChange, ) }) }) When("the second hr1 endpoint slice is deleted", func() { It("should not trigger a change", func() { - testDeleteTriggersChange( + testDeleteDoesNotTriggerChange( hr1slice2, types.NamespacedName{Namespace: hr1slice2.Namespace, Name: hr1slice2.Name}, - state.NoChange, ) }) }) When("hr2 is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hr2, state.ClusterStateChange) + testUpsertTriggersChange(hr2) }) }) When("a hr3, that shares a backend service with hr2, is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hr3, state.ClusterStateChange) + testUpsertTriggersChange(hr3) }) }) When("sharedSvc, a service referenced by both hr2 and hr3, is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(sharedSvc, state.ClusterStateChange) + testUpsertTriggersChange(sharedSvc) }) }) When("hr2 is deleted", func() { @@ -2535,7 +2529,6 @@ var _ = Describe("ChangeProcessor", func() { testDeleteTriggersChange( hr2, types.NamespacedName{Namespace: hr2.Namespace, Name: hr2.Name}, - state.ClusterStateChange, ) }) }) @@ -2544,13 +2537,12 @@ var _ = Describe("ChangeProcessor", func() { testDeleteTriggersChange( sharedSvc, types.NamespacedName{Namespace: sharedSvc.Namespace, Name: sharedSvc.Name}, - state.ClusterStateChange, ) }) }) When("sharedSvc is recreated", func() { It("should trigger a change", func() { - testUpsertTriggersChange(sharedSvc, state.ClusterStateChange) + testUpsertTriggersChange(sharedSvc) }) }) When("hr3 is deleted", func() { @@ -2558,62 +2550,59 @@ var _ = Describe("ChangeProcessor", func() { testDeleteTriggersChange( hr3, types.NamespacedName{Namespace: hr3.Namespace, Name: hr3.Name}, - state.ClusterStateChange, ) }) }) When("sharedSvc is deleted", func() { It("should not trigger a change", func() { - testDeleteTriggersChange( + testDeleteDoesNotTriggerChange( sharedSvc, types.NamespacedName{Namespace: sharedSvc.Namespace, Name: sharedSvc.Name}, - state.NoChange, ) }) }) When("a service that is not referenced by any route is added", func() { It("should not trigger a change", func() { - testUpsertTriggersChange(notRefSvc, state.NoChange) + testUpsertDoesNotTriggerChange(notRefSvc) }) }) When("a route with an invalid backend ref type is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hrInvalidBackendRef, state.ClusterStateChange) + testUpsertTriggersChange(hrInvalidBackendRef) }) }) When("a service with a namespace name that matches invalid backend ref is added", func() { It("should not trigger a change", func() { - testUpsertTriggersChange(invalidSvc, state.NoChange) + testUpsertDoesNotTriggerChange(invalidSvc) }) }) When("an endpoint slice that is not owned by a referenced service is added", func() { It("should not trigger a change", func() { - testUpsertTriggersChange(noRefSlice, state.NoChange) + testUpsertDoesNotTriggerChange(noRefSlice) }) }) When("an endpoint slice that is not owned by a referenced service is deleted", func() { It("should not trigger a change", func() { - testDeleteTriggersChange( + testDeleteDoesNotTriggerChange( noRefSlice, types.NamespacedName{Namespace: noRefSlice.Namespace, Name: noRefSlice.Name}, - state.NoChange, ) }) }) Context("processing a route with multiple rules and three unique backend services", func() { When("route is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(hrMultipleRules, state.ClusterStateChange) + testUpsertTriggersChange(hrMultipleRules) }) }) When("first referenced service is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(bazSvc1, state.ClusterStateChange) + testUpsertTriggersChange(bazSvc1) }) }) When("second referenced service is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(bazSvc2, state.ClusterStateChange) + testUpsertTriggersChange(bazSvc2) }) }) When("first referenced service is deleted", func() { @@ -2621,23 +2610,22 @@ var _ = Describe("ChangeProcessor", func() { testDeleteTriggersChange( bazSvc1, types.NamespacedName{Namespace: bazSvc1.Namespace, Name: bazSvc1.Name}, - state.ClusterStateChange, ) }) }) When("first referenced service is recreated", func() { It("should trigger a change", func() { - testUpsertTriggersChange(bazSvc1, state.ClusterStateChange) + testUpsertTriggersChange(bazSvc1) }) }) When("third referenced service is added", func() { It("should trigger a change", func() { - testUpsertTriggersChange(bazSvc3, state.ClusterStateChange) + testUpsertTriggersChange(bazSvc3) }) }) When("third referenced service is updated", func() { It("should trigger a change", func() { - testUpsertTriggersChange(bazSvc3, state.ClusterStateChange) + testUpsertTriggersChange(bazSvc3) }) }) When("route is deleted", func() { @@ -2648,34 +2636,30 @@ var _ = Describe("ChangeProcessor", func() { Namespace: hrMultipleRules.Namespace, Name: hrMultipleRules.Name, }, - state.ClusterStateChange, ) }) }) When("first referenced service is deleted", func() { It("should not trigger a change", func() { - testDeleteTriggersChange( + testDeleteDoesNotTriggerChange( bazSvc1, types.NamespacedName{Namespace: bazSvc1.Namespace, Name: bazSvc1.Name}, - state.NoChange, ) }) }) When("second referenced service is deleted", func() { It("should not trigger a change", func() { - testDeleteTriggersChange( + testDeleteDoesNotTriggerChange( bazSvc2, types.NamespacedName{Namespace: bazSvc2.Namespace, Name: bazSvc2.Name}, - state.NoChange, ) }) }) When("final referenced service is deleted", func() { It("should not trigger a change", func() { - testDeleteTriggersChange( + testDeleteDoesNotTriggerChange( bazSvc3, types.NamespacedName{Namespace: bazSvc3.Namespace, Name: bazSvc3.Name}, - state.NoChange, ) }) }) @@ -2748,44 +2732,31 @@ var _ = Describe("ChangeProcessor", func() { When("a namespace is created that is not linked to a listener", func() { It("does not trigger an update", func() { - processor.CaptureUpsertChange(nsNoLabels) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + testUpsertDoesNotTriggerChange(nsNoLabels) }) }) When("a namespace is created that is linked to a listener", func() { It("triggers an update", func() { - processor.CaptureUpsertChange(ns) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + testUpsertTriggersChange(ns) }) }) When("a namespace is deleted that is not linked to a listener", func() { It("does not trigger an update", func() { - processor.CaptureDeleteChange(nsNoLabels, types.NamespacedName{Name: "no-labels"}) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + testDeleteDoesNotTriggerChange(nsNoLabels, types.NamespacedName{Name: "no-labels"}) }) }) When("a namespace is deleted that is linked to a listener", func() { It("triggers an update", func() { - processor.CaptureDeleteChange(ns, types.NamespacedName{Name: "ns"}) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + testDeleteTriggersChange(ns, types.NamespacedName{Name: "ns"}) }) }) When("a namespace that is not linked to a listener has its labels changed to match a listener", func() { It("triggers an update", func() { - processor.CaptureUpsertChange(nsDifferentLabels) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.NoChange)) - + testUpsertDoesNotTriggerChange(nsDifferentLabels) nsDifferentLabels.Labels = map[string]string{ "app": "allowed", } - processor.CaptureUpsertChange(nsDifferentLabels) - changed, _ = processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + testUpsertTriggersChange(nsDifferentLabels) }) }) When( @@ -2795,9 +2766,7 @@ var _ = Describe("ChangeProcessor", func() { nsDifferentLabels.Labels = map[string]string{ "oranges": "bananas", } - processor.CaptureUpsertChange(nsDifferentLabels) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + testUpsertTriggersChange(nsDifferentLabels) }) }, ) @@ -2808,9 +2777,7 @@ var _ = Describe("ChangeProcessor", func() { "oranges": "bananas", } gwChangedLabel.Generation++ - processor.CaptureUpsertChange(gwChangedLabel) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + testUpsertTriggersChange(gwChangedLabel) // After changing the gateway's labels and generation, the processor should be marked to update // the nginx configuration and build a new graph. When processor.Process() gets called, @@ -2819,29 +2786,20 @@ var _ = Describe("ChangeProcessor", func() { // the new labels on the gateway, it would not trigger a change as the namespace would no longer // be in the updated referencedNamespaces and the labels no longer match the new labels on the // gateway. - processor.CaptureUpsertChange(ns) - changed, _ = processor.Process() - Expect(changed).To(Equal(state.NoChange)) - - processor.CaptureUpsertChange(nsDifferentLabels) - changed, _ = processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + testUpsertDoesNotTriggerChange(ns) + testUpsertTriggersChange(nsDifferentLabels) }) }) When("a namespace that is not linked to a listener has its labels removed", func() { It("does not trigger an update", func() { ns.Labels = nil - processor.CaptureUpsertChange(ns) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + testUpsertDoesNotTriggerChange(ns) }) }) When("a namespace that is linked to a listener has its labels removed", func() { It("triggers an update when labels are removed", func() { nsDifferentLabels.Labels = nil - processor.CaptureUpsertChange(nsDifferentLabels) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + testUpsertTriggersChange(nsDifferentLabels) }) }) }) @@ -2883,23 +2841,23 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(np) processor.CaptureUpsertChange(paramGC) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.GatewayClass.NginxProxy.Source).To(Equal(np)) }) It("captures changes for an NginxProxy", func() { processor.CaptureUpsertChange(npUpdated) processor.CaptureUpsertChange(paramGC) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.GatewayClass.NginxProxy.Source).To(Equal(npUpdated)) }) It("handles deletes for an NginxProxy", func() { processor.CaptureDeleteChange(np, client.ObjectKeyFromObject(np)) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.GatewayClass.NginxProxy).To(BeNil()) }) }) @@ -2957,25 +2915,25 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(np) processor.CaptureUpsertChange(paramGW) - changed, graph := processor.Process() + graph := processor.Process() + Expect(graph).ToNot(BeNil()) gw := graph.Gateways[types.NamespacedName{Namespace: "test", Name: "param-gw"}] - Expect(changed).To(Equal(state.ClusterStateChange)) Expect(gw.NginxProxy.Source).To(Equal(np)) }) It("captures changes for an NginxProxy", func() { processor.CaptureUpsertChange(npUpdated) processor.CaptureUpsertChange(paramGW) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) gw := graph.Gateways[types.NamespacedName{Namespace: "test", Name: "param-gw"}] Expect(gw.NginxProxy.Source).To(Equal(npUpdated)) }) It("handles deletes for an NginxProxy", func() { processor.CaptureDeleteChange(np, client.ObjectKeyFromObject(np)) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) gw := graph.Gateways[types.NamespacedName{Namespace: "test", Name: "param-gw"}] Expect(gw.NginxProxy).To(BeNil()) }) @@ -2995,8 +2953,8 @@ var _ = Describe("ChangeProcessor", func() { BeforeAll(func() { processor.CaptureUpsertChange(gc) - changed, newGraph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + newGraph := processor.Process() + Expect(newGraph).ToNot(BeNil()) Expect(newGraph.GatewayClass.Source).To(Equal(gc)) Expect(newGraph.NGFPolicies).To(BeEmpty()) @@ -3126,29 +3084,28 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(obs) processor.CaptureUpsertChange(usp) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + Expect(processor.Process()).To(BeNil()) }) }) When("the resource the policy references is created", func() { It("populates the graph with the policy", func() { processor.CaptureUpsertChange(gw) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.NGFPolicies).To(HaveKey(cspKey)) Expect(graph.NGFPolicies[cspKey].Source).To(Equal(csp)) Expect(graph.NGFPolicies).ToNot(HaveKey(obsKey)) processor.CaptureUpsertChange(route) - changed, graph = processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph = processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.NGFPolicies).To(HaveKey(obsKey)) Expect(graph.NGFPolicies[obsKey].Source).To(Equal(obs)) processor.CaptureUpsertChange(svc) - changed, graph = processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph = processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.NGFPolicies).To(HaveKey(uspKey)) Expect(graph.NGFPolicies[uspKey].Source).To(Equal(usp)) }) @@ -3159,8 +3116,8 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(obsUpdated) processor.CaptureUpsertChange(uspUpdated) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.NGFPolicies).To(HaveKey(cspKey)) Expect(graph.NGFPolicies[cspKey].Source).To(Equal(cspUpdated)) Expect(graph.NGFPolicies).To(HaveKey(obsKey)) @@ -3175,8 +3132,8 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureDeleteChange(&ngfAPIv1alpha2.ObservabilityPolicy{}, client.ObjectKeyFromObject(obs)) processor.CaptureDeleteChange(&ngfAPIv1alpha1.UpstreamSettingsPolicy{}, client.ObjectKeyFromObject(usp)) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.NGFPolicies).To(BeEmpty()) }) }) @@ -3224,8 +3181,8 @@ var _ = Describe("ChangeProcessor", func() { It("handles upserts for a SnippetsFilter", func() { processor.CaptureUpsertChange(sf) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) processedSf, exists := graph.SnippetsFilters[sfNsName] Expect(exists).To(BeTrue()) @@ -3235,8 +3192,8 @@ var _ = Describe("ChangeProcessor", func() { It("captures changes for a SnippetsFilter", func() { processor.CaptureUpsertChange(sfUpdated) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) processedSf, exists := graph.SnippetsFilters[sfNsName] Expect(exists).To(BeTrue()) @@ -3246,8 +3203,8 @@ var _ = Describe("ChangeProcessor", func() { It("handles deletes for a SnippetsFilter", func() { processor.CaptureDeleteChange(sfUpdated, sfNsName) - changed, graph := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + graph := processor.Process() + Expect(graph).ToNot(BeNil()) Expect(graph.SnippetsFilters).To(BeEmpty()) }) }) @@ -3582,7 +3539,7 @@ var _ = Describe("ChangeProcessor", func() { } npUpdated = np.DeepCopy() }) - // Changing change - a change that makes processor.Process() report changed + // Changing change - a change that makes processor.Process() return a built graph // Non-changing change - a change that doesn't do that // Related resource - a K8s resource that is related to a configured Gateway API resource // Unrelated resource - a K8s resource that is not related to a configured Gateway API resource @@ -3590,7 +3547,7 @@ var _ = Describe("ChangeProcessor", func() { // Note: in these tests, we deliberately don't fully inspect the returned configuration and statuses // -- this is done in 'Normal cases of processing changes' Describe("Multiple Gateway API resource changes", Ordered, func() { - It("should report changed after multiple Upserts", func() { + It("should build graph after multiple Upserts", func() { processor.CaptureUpsertChange(gc) processor.CaptureUpsertChange(gw1) processor.CaptureUpsertChange(testNs) @@ -3601,11 +3558,10 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(cm) processor.CaptureUpsertChange(np) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) When("a upsert of updated resources is followed by an upsert of the same generation", func() { - It("should report changed", func() { + It("should build graph", func() { // these are changing changes processor.CaptureUpsertChange(gcUpdated) processor.CaptureUpsertChange(gw1Updated) @@ -3626,22 +3582,20 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(cmUpdated) processor.CaptureUpsertChange(npUpdated) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) }) - It("should report changed after upserting new resources", func() { + It("should build graph after upserting new resources", func() { // we can't have a second GatewayClass, so we don't add it processor.CaptureUpsertChange(gw2) processor.CaptureUpsertChange(hr2) processor.CaptureUpsertChange(gr2) processor.CaptureUpsertChange(rg2) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) When("resources are deleted followed by upserts with the same generations", func() { - It("should report changed", func() { + It("should build graph", func() { // these are changing changes processor.CaptureDeleteChange(&v1.GatewayClass{}, gcNsName) processor.CaptureDeleteChange(&v1.Gateway{}, gwNsName) @@ -3658,20 +3612,18 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(gr2) processor.CaptureUpsertChange(rg2) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) }) - It("should report changed after deleting resources", func() { + It("should build graph after deleting resources", func() { processor.CaptureDeleteChange(&v1.HTTPRoute{}, hr2NsName) processor.CaptureDeleteChange(&v1.HTTPRoute{}, gr2NsName) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) }) Describe("Deleting non-existing Gateway API resource", func() { - It("should not report changed after deleting non-existing", func() { + It("should not build graph after deleting non-existing", func() { processor.CaptureDeleteChange(&v1.GatewayClass{}, gcNsName) processor.CaptureDeleteChange(&v1.Gateway{}, gwNsName) processor.CaptureDeleteChange(&v1.HTTPRoute{}, hrNsName) @@ -3680,8 +3632,7 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureDeleteChange(&v1.HTTPRoute{}, gr2NsName) processor.CaptureDeleteChange(&v1beta1.ReferenceGrant{}, rgNsName) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + Expect(processor.Process()).To(BeNil()) }) }) Describe("Multiple Kubernetes API resource changes", Ordered, func() { @@ -3695,31 +3646,28 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(secret) processor.CaptureUpsertChange(barSecret) processor.CaptureUpsertChange(cm) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) - It("should report changed after multiple Upserts of related resources", func() { + It("should build graph after multiple Upserts of related resources", func() { processor.CaptureUpsertChange(svc) processor.CaptureUpsertChange(slice) processor.CaptureUpsertChange(ns) processor.CaptureUpsertChange(secretUpdated) processor.CaptureUpsertChange(cmUpdated) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) - It("should report not changed after multiple Upserts of unrelated resources", func() { + It("should not build graph after multiple Upserts of unrelated resources", func() { processor.CaptureUpsertChange(unrelatedSvc) processor.CaptureUpsertChange(unrelatedSlice) processor.CaptureUpsertChange(unrelatedNS) processor.CaptureUpsertChange(unrelatedSecret) processor.CaptureUpsertChange(unrelatedCM) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + Expect(processor.Process()).To(BeNil()) }) When("upserts of related resources are followed by upserts of unrelated resources", func() { - It("should report changed", func() { + It("should build graph", func() { // these are changing changes processor.CaptureUpsertChange(barSvc) processor.CaptureUpsertChange(barSlice) @@ -3734,12 +3682,11 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(unrelatedSecret) processor.CaptureUpsertChange(unrelatedCM) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) }) When("deletes of related resources are followed by upserts of unrelated resources", func() { - It("should report changed", func() { + It("should build graph", func() { // these are changing changes processor.CaptureDeleteChange(&apiv1.Service{}, svcNsName) processor.CaptureDeleteChange(&discoveryV1.EndpointSlice{}, sliceNsName) @@ -3754,13 +3701,12 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(unrelatedSecret) processor.CaptureUpsertChange(unrelatedCM) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) }) }) Describe("Multiple Kubernetes API and Gateway API resource changes", Ordered, func() { - It("should report changed after multiple Upserts of new and related resources", func() { + It("should build graph after multiple Upserts of new and related resources", func() { // new Gateway API resources processor.CaptureUpsertChange(gc) processor.CaptureUpsertChange(gw1) @@ -3777,10 +3723,9 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(secret) processor.CaptureUpsertChange(cm) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }) - It("should report not changed after multiple Upserts of unrelated resources", func() { + It("should not build graph after multiple Upserts of unrelated resources", func() { // unrelated Kubernetes API resources processor.CaptureUpsertChange(unrelatedSvc) processor.CaptureUpsertChange(unrelatedSlice) @@ -3788,10 +3733,9 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(unrelatedSecret) processor.CaptureUpsertChange(unrelatedCM) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.NoChange)) + Expect(processor.Process()).To(BeNil()) }) - It("should report changed after upserting changed resources followed by upserting unrelated resources", + It("should build graph after upserting changed resources followed by upserting unrelated resources", func() { // these are changing changes processor.CaptureUpsertChange(gcUpdated) @@ -3808,8 +3752,7 @@ var _ = Describe("ChangeProcessor", func() { processor.CaptureUpsertChange(unrelatedSecret) processor.CaptureUpsertChange(unrelatedCM) - changed, _ := processor.Process() - Expect(changed).To(Equal(state.ClusterStateChange)) + Expect(processor.Process()).ToNot(BeNil()) }, ) }) diff --git a/internal/mode/static/state/dataplane/configuration.go b/internal/mode/static/state/dataplane/configuration.go index 0512876405..e0de92d14d 100644 --- a/internal/mode/static/state/dataplane/configuration.go +++ b/internal/mode/static/state/dataplane/configuration.go @@ -33,11 +33,10 @@ func BuildConfiguration( g *graph.Graph, gateway *graph.Gateway, serviceResolver resolver.ServiceResolver, - configVersion int, plus bool, ) Configuration { if g.GatewayClass == nil || !g.GatewayClass.Valid || gateway == nil { - config := GetDefaultConfiguration(g, configVersion, gateway) + config := GetDefaultConfiguration(g, gateway) if plus { config.NginxPlus = buildNginxPlus(gateway) } @@ -70,7 +69,6 @@ func BuildConfiguration( StreamUpstreams: buildStreamUpstreams(ctx, gateway, serviceResolver, baseHTTPConfig.IPFamily), BackendGroups: backendGroups, SSLKeyPairs: buildSSLKeyPairs(g.ReferencedSecrets, gateway.Listeners), - Version: configVersion, CertBundles: buildCertBundles( buildRefCertificateBundles(g.ReferencedSecrets, g.ReferencedCaCertConfigMaps), backendGroups, @@ -720,6 +718,12 @@ func buildUpstreams( for _, up := range uniqueUpstreams { upstreams = append(upstreams, up) } + + // Preserve order so that this doesn't trigger an unnecessary reload. + sort.Slice(upstreams, func(i, j int) bool { + return upstreams[i].Name < upstreams[j].Name + }) + return upstreams } @@ -1131,9 +1135,8 @@ func buildNginxPlus(gateway *graph.Gateway) NginxPlus { return nginxPlusSettings } -func GetDefaultConfiguration(g *graph.Graph, configVersion int, gateway *graph.Gateway) Configuration { +func GetDefaultConfiguration(g *graph.Graph, gateway *graph.Gateway) Configuration { return Configuration{ - Version: configVersion, Logging: buildLogging(gateway), NginxPlus: NginxPlus{}, AuxiliarySecrets: buildAuxiliarySecrets(g.PlusSecrets), diff --git a/internal/mode/static/state/dataplane/configuration_test.go b/internal/mode/static/state/dataplane/configuration_test.go index 7f3a50795d..78e4869c59 100644 --- a/internal/mode/static/state/dataplane/configuration_test.go +++ b/internal/mode/static/state/dataplane/configuration_test.go @@ -2531,7 +2531,6 @@ func TestBuildConfiguration(t *testing.T) { test.graph, test.graph.Gateways[gatewayNsName], fakeResolver, - 1, false, ) @@ -2541,7 +2540,6 @@ func TestBuildConfiguration(t *testing.T) { g.Expect(result.SSLServers).To(ConsistOf(test.expConf.SSLServers)) g.Expect(result.TLSPassthroughServers).To(ConsistOf(test.expConf.TLSPassthroughServers)) g.Expect(result.SSLKeyPairs).To(Equal(test.expConf.SSLKeyPairs)) - g.Expect(result.Version).To(Equal(1)) g.Expect(result.CertBundles).To(Equal(test.expConf.CertBundles)) g.Expect(result.Telemetry).To(Equal(test.expConf.Telemetry)) g.Expect(result.BaseHTTPConfig).To(Equal(test.expConf.BaseHTTPConfig)) @@ -2648,7 +2646,6 @@ func TestBuildConfiguration_Plus(t *testing.T) { test.graph, test.graph.Gateways[gatewayNsName], fakeResolver, - 1, true, ) @@ -2658,7 +2655,6 @@ func TestBuildConfiguration_Plus(t *testing.T) { g.Expect(result.SSLServers).To(ConsistOf(test.expConf.SSLServers)) g.Expect(result.TLSPassthroughServers).To(ConsistOf(test.expConf.TLSPassthroughServers)) g.Expect(result.SSLKeyPairs).To(Equal(test.expConf.SSLKeyPairs)) - g.Expect(result.Version).To(Equal(1)) g.Expect(result.CertBundles).To(Equal(test.expConf.CertBundles)) g.Expect(result.Telemetry).To(Equal(test.expConf.Telemetry)) g.Expect(result.BaseHTTPConfig).To(Equal(test.expConf.BaseHTTPConfig)) diff --git a/internal/mode/static/state/dataplane/types.go b/internal/mode/static/state/dataplane/types.go index 55e5476f51..975edb4c33 100644 --- a/internal/mode/static/state/dataplane/types.go +++ b/internal/mode/static/state/dataplane/types.go @@ -54,8 +54,6 @@ type Configuration struct { NginxPlus NginxPlus // BaseHTTPConfig holds the configuration options at the http context. BaseHTTPConfig BaseHTTPConfig - // Version represents the version of the generated configuration. - Version int } // SSLKeyPairID is a unique identifier for a SSLKeyPair. diff --git a/internal/mode/static/state/statefakes/fake_change_processor.go b/internal/mode/static/state/statefakes/fake_change_processor.go index b3de756b60..c88a31ce01 100644 --- a/internal/mode/static/state/statefakes/fake_change_processor.go +++ b/internal/mode/static/state/statefakes/fake_change_processor.go @@ -33,17 +33,15 @@ type FakeChangeProcessor struct { getLatestGraphReturnsOnCall map[int]struct { result1 *graph.Graph } - ProcessStub func() (state.ChangeType, *graph.Graph) + ProcessStub func() *graph.Graph processMutex sync.RWMutex processArgsForCall []struct { } processReturns struct { - result1 state.ChangeType - result2 *graph.Graph + result1 *graph.Graph } processReturnsOnCall map[int]struct { - result1 state.ChangeType - result2 *graph.Graph + result1 *graph.Graph } invocations map[string][][]interface{} invocationsMutex sync.RWMutex @@ -167,7 +165,7 @@ func (fake *FakeChangeProcessor) GetLatestGraphReturnsOnCall(i int, result1 *gra }{result1} } -func (fake *FakeChangeProcessor) Process() (state.ChangeType, *graph.Graph) { +func (fake *FakeChangeProcessor) Process() *graph.Graph { fake.processMutex.Lock() ret, specificReturn := fake.processReturnsOnCall[len(fake.processArgsForCall)] fake.processArgsForCall = append(fake.processArgsForCall, struct { @@ -180,9 +178,9 @@ func (fake *FakeChangeProcessor) Process() (state.ChangeType, *graph.Graph) { return stub() } if specificReturn { - return ret.result1, ret.result2 + return ret.result1 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1 } func (fake *FakeChangeProcessor) ProcessCallCount() int { @@ -191,36 +189,33 @@ func (fake *FakeChangeProcessor) ProcessCallCount() int { return len(fake.processArgsForCall) } -func (fake *FakeChangeProcessor) ProcessCalls(stub func() (state.ChangeType, *graph.Graph)) { +func (fake *FakeChangeProcessor) ProcessCalls(stub func() *graph.Graph) { fake.processMutex.Lock() defer fake.processMutex.Unlock() fake.ProcessStub = stub } -func (fake *FakeChangeProcessor) ProcessReturns(result1 state.ChangeType, result2 *graph.Graph) { +func (fake *FakeChangeProcessor) ProcessReturns(result1 *graph.Graph) { fake.processMutex.Lock() defer fake.processMutex.Unlock() fake.ProcessStub = nil fake.processReturns = struct { - result1 state.ChangeType - result2 *graph.Graph - }{result1, result2} + result1 *graph.Graph + }{result1} } -func (fake *FakeChangeProcessor) ProcessReturnsOnCall(i int, result1 state.ChangeType, result2 *graph.Graph) { +func (fake *FakeChangeProcessor) ProcessReturnsOnCall(i int, result1 *graph.Graph) { fake.processMutex.Lock() defer fake.processMutex.Unlock() fake.ProcessStub = nil if fake.processReturnsOnCall == nil { fake.processReturnsOnCall = make(map[int]struct { - result1 state.ChangeType - result2 *graph.Graph + result1 *graph.Graph }) } fake.processReturnsOnCall[i] = struct { - result1 state.ChangeType - result2 *graph.Graph - }{result1, result2} + result1 *graph.Graph + }{result1} } func (fake *FakeChangeProcessor) Invocations() map[string][][]interface{} { diff --git a/internal/mode/static/state/store.go b/internal/mode/static/state/store.go index 58bf28216a..910f257c90 100644 --- a/internal/mode/static/state/store.go +++ b/internal/mode/static/state/store.go @@ -3,7 +3,6 @@ package state import ( "fmt" - discoveryV1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -188,7 +187,7 @@ type changeTrackingUpdater struct { extractGVK kinds.MustExtractGVK supportedGVKs gvkList - changeType ChangeType + changed bool } func newChangeTrackingUpdater( @@ -221,7 +220,6 @@ func newChangeTrackingUpdater( extractGVK: extractGVK, supportedGVKs: supportedGVKs, stateChangedPredicates: stateChangedPredicates, - changeType: NoChange, } } @@ -255,7 +253,7 @@ func (s *changeTrackingUpdater) Upsert(obj client.Object) { changingUpsert := s.upsert(obj) - s.setChangeType(obj, changingUpsert) + s.changed = s.changed || changingUpsert } func (s *changeTrackingUpdater) delete(objType ngftypes.ObjectType, nsname types.NamespacedName) (changed bool) { @@ -282,28 +280,13 @@ func (s *changeTrackingUpdater) Delete(objType ngftypes.ObjectType, nsname types changingDelete := s.delete(objType, nsname) - s.setChangeType(objType, changingDelete) + s.changed = s.changed || changingDelete } -// getAndResetChangedStatus returns the type of change that occurred based on the previous updates (Upserts/Deletes). -// It also resets the changed status to NoChange. -func (s *changeTrackingUpdater) getAndResetChangedStatus() ChangeType { - changeType := s.changeType - s.changeType = NoChange - return changeType -} - -// setChangeType determines and sets the type of change that occurred. -// - if no change occurred on this object, then keep the changeType as-is (could've been set by another object event) -// - if changeType is already a ClusterStateChange, then we don't need to update the value -// - otherwise, if we are processing an Endpoint update, then this is an EndpointsOnlyChange changeType -// - otherwise, this is a different object, and is a ClusterStateChange changeType. -func (s *changeTrackingUpdater) setChangeType(obj client.Object, changed bool) { - if changed && s.changeType != ClusterStateChange { - if _, ok := obj.(*discoveryV1.EndpointSlice); ok { - s.changeType = EndpointsOnlyChange - } else { - s.changeType = ClusterStateChange - } - } +// getAndResetChangedStatus returns if a change occurred based on the previous updates (Upserts/Deletes). +// It also resets the changed status to false. +func (s *changeTrackingUpdater) getAndResetChangedStatus() bool { + changed := s.changed + s.changed = false + return changed } diff --git a/internal/mode/static/state/store_test.go b/internal/mode/static/state/store_test.go deleted file mode 100644 index 54e60264fa..0000000000 --- a/internal/mode/static/state/store_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package state - -import ( - "testing" - - . "github.com/onsi/gomega" - discoveryV1 "k8s.io/api/discovery/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - v1 "sigs.k8s.io/gateway-api/apis/v1" -) - -//nolint:paralleltest,tparallel // Order matters for these tests. -func TestSetChangeType(t *testing.T) { - t.Parallel() - ctu := newChangeTrackingUpdater(nil, nil) - - // Order matters for these cases. - tests := []struct { - obj client.Object - name string - exp ChangeType - changed bool - }{ - { - name: "no change", - exp: NoChange, - }, - { - name: "endpoint object", - obj: &discoveryV1.EndpointSlice{}, - changed: true, - exp: EndpointsOnlyChange, - }, - { - name: "non-endpoint object", - obj: &v1.HTTPRoute{}, - changed: true, - exp: ClusterStateChange, - }, - { - name: "changeType was previously set to ClusterStateChange", - obj: &discoveryV1.EndpointSlice{}, - changed: true, - exp: ClusterStateChange, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - g := NewWithT(t) - ctu.setChangeType(test.obj, test.changed) - g.Expect(ctu.changeType).To(Equal(test.exp)) - }) - } -} diff --git a/internal/mode/static/telemetry/collector.go b/internal/mode/static/telemetry/collector.go index cfa3614df7..8277515f56 100644 --- a/internal/mode/static/telemetry/collector.go +++ b/internal/mode/static/telemetry/collector.go @@ -34,7 +34,7 @@ type GraphGetter interface { // ConfigurationGetter gets the latest Configuration. type ConfigurationGetter interface { - GetLatestConfiguration() *dataplane.Configuration + GetLatestConfiguration() []*dataplane.Configuration } // Data is telemetry data. @@ -192,7 +192,7 @@ func collectGraphResourceCount( configurationGetter ConfigurationGetter, ) NGFResourceCounts { ngfResourceCounts := NGFResourceCounts{} - cfg := configurationGetter.GetLatestConfiguration() + configs := configurationGetter.GetLatestConfiguration() ngfResourceCounts.GatewayClassCount = int64(len(g.IgnoredGatewayClasses)) if g.GatewayClass != nil { @@ -209,7 +209,7 @@ func collectGraphResourceCount( ngfResourceCounts.SecretCount = int64(len(g.ReferencedSecrets)) ngfResourceCounts.ServiceCount = int64(len(g.ReferencedServices)) - if cfg != nil { + for _, cfg := range configs { for _, upstream := range cfg.Upstreams { if upstream.ErrorMsg == "" { ngfResourceCounts.EndpointCount += int64(len(upstream.Endpoints)) diff --git a/internal/mode/static/telemetry/collector_test.go b/internal/mode/static/telemetry/collector_test.go index 4620d186ce..ce75e47c02 100644 --- a/internal/mode/static/telemetry/collector_test.go +++ b/internal/mode/static/telemetry/collector_test.go @@ -183,7 +183,7 @@ var _ = Describe("Collector", Ordered, func() { fakeConfigurationGetter = &telemetryfakes.FakeConfigurationGetter{} fakeGraphGetter.GetLatestGraphReturns(&graph.Graph{}) - fakeConfigurationGetter.GetLatestConfigurationReturns(&dataplane.Configuration{}) + fakeConfigurationGetter.GetLatestConfigurationReturns(nil) dataCollector = telemetry.NewDataCollectorImpl(telemetry.DataCollectorConfig{ K8sClientReader: k8sClientReader, @@ -368,31 +368,47 @@ var _ = Describe("Collector", Ordered, func() { }, } - config := &dataplane.Configuration{ - Upstreams: []dataplane.Upstream{ - { - Name: "upstream1", - ErrorMsg: "", - Endpoints: []resolver.Endpoint{ - { - Address: "endpoint1", - Port: 80, - }, { - Address: "endpoint2", - Port: 80, - }, { - Address: "endpoint3", - Port: 80, + configs := []*dataplane.Configuration{ + { + Upstreams: []dataplane.Upstream{ + { + Name: "upstream1", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint1", + Port: 80, + }, { + Address: "endpoint2", + Port: 80, + }, { + Address: "endpoint3", + Port: 80, + }, + }, + }, + { + Name: "upstream2", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint1", + Port: 80, + }, }, }, }, - { - Name: "upstream2", - ErrorMsg: "", - Endpoints: []resolver.Endpoint{ - { - Address: "endpoint1", - Port: 80, + }, + { + Upstreams: []dataplane.Upstream{ + { + Name: "upstream3", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint4", + Port: 80, + }, }, }, }, @@ -400,7 +416,7 @@ var _ = Describe("Collector", Ordered, func() { } fakeGraphGetter.GetLatestGraphReturns(graph) - fakeConfigurationGetter.GetLatestConfigurationReturns(config) + fakeConfigurationGetter.GetLatestConfigurationReturns(configs) expData.ClusterNodeCount = 3 expData.NGFResourceCounts = telemetry.NGFResourceCounts{ @@ -410,7 +426,7 @@ var _ = Describe("Collector", Ordered, func() { TLSRouteCount: 3, SecretCount: 3, ServiceCount: 3, - EndpointCount: 4, + EndpointCount: 5, GRPCRouteCount: 2, BackendTLSPolicyCount: 3, GatewayAttachedClientSettingsPolicyCount: 1, @@ -569,7 +585,7 @@ var _ = Describe("Collector", Ordered, func() { Describe("NGF resource count collector", func() { var ( graph1 *graph.Graph - config1, invalidUpstreamsConfig *dataplane.Configuration + config1, invalidUpstreamsConfig []*dataplane.Configuration ) BeforeAll(func() { @@ -626,43 +642,47 @@ var _ = Describe("Collector", Ordered, func() { }, } - config1 = &dataplane.Configuration{ - Upstreams: []dataplane.Upstream{ - { - Name: "upstream1", - ErrorMsg: "", - Endpoints: []resolver.Endpoint{ - { - Address: "endpoint1", - Port: 80, + config1 = []*dataplane.Configuration{ + { + Upstreams: []dataplane.Upstream{ + { + Name: "upstream1", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint1", + Port: 80, + }, }, }, }, }, } - invalidUpstreamsConfig = &dataplane.Configuration{ - Upstreams: []dataplane.Upstream{ - { - Name: "invalidUpstream", - ErrorMsg: "there is an error here", - Endpoints: []resolver.Endpoint{ - { - Address: "endpoint1", - Port: 80, - }, { - Address: "endpoint2", - Port: 80, - }, { - Address: "endpoint3", - Port: 80, + invalidUpstreamsConfig = []*dataplane.Configuration{ + { + Upstreams: []dataplane.Upstream{ + { + Name: "invalidUpstream", + ErrorMsg: "there is an error here", + Endpoints: []resolver.Endpoint{ + { + Address: "endpoint1", + Port: 80, + }, { + Address: "endpoint2", + Port: 80, + }, { + Address: "endpoint3", + Port: 80, + }, }, }, - }, - { - Name: "emptyUpstream", - ErrorMsg: "", - Endpoints: []resolver.Endpoint{}, + { + Name: "emptyUpstream", + ErrorMsg: "", + Endpoints: []resolver.Endpoint{}, + }, }, }, } @@ -671,7 +691,7 @@ var _ = Describe("Collector", Ordered, func() { When("collecting NGF resource counts", func() { It("collects correct data for graph with no resources", func(ctx SpecContext) { fakeGraphGetter.GetLatestGraphReturns(&graph.Graph{}) - fakeConfigurationGetter.GetLatestConfigurationReturns(&dataplane.Configuration{}) + fakeConfigurationGetter.GetLatestConfigurationReturns(nil) expData.NGFResourceCounts = telemetry.NGFResourceCounts{} @@ -721,7 +741,7 @@ var _ = Describe("Collector", Ordered, func() { When("it encounters an error while collecting data", func() { BeforeEach(func() { fakeGraphGetter.GetLatestGraphReturns(&graph.Graph{}) - fakeConfigurationGetter.GetLatestConfigurationReturns(&dataplane.Configuration{}) + fakeConfigurationGetter.GetLatestConfigurationReturns(nil) }) It("should error on nil latest graph", func(ctx SpecContext) { expectedError := errors.New("failed to collect telemetry data: latest graph cannot be nil") diff --git a/internal/mode/static/telemetry/telemetryfakes/fake_configuration_getter.go b/internal/mode/static/telemetry/telemetryfakes/fake_configuration_getter.go index a56fce8f7b..8650078dc7 100644 --- a/internal/mode/static/telemetry/telemetryfakes/fake_configuration_getter.go +++ b/internal/mode/static/telemetry/telemetryfakes/fake_configuration_getter.go @@ -9,21 +9,21 @@ import ( ) type FakeConfigurationGetter struct { - GetLatestConfigurationStub func() *dataplane.Configuration + GetLatestConfigurationStub func() []*dataplane.Configuration getLatestConfigurationMutex sync.RWMutex getLatestConfigurationArgsForCall []struct { } getLatestConfigurationReturns struct { - result1 *dataplane.Configuration + result1 []*dataplane.Configuration } getLatestConfigurationReturnsOnCall map[int]struct { - result1 *dataplane.Configuration + result1 []*dataplane.Configuration } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *FakeConfigurationGetter) GetLatestConfiguration() *dataplane.Configuration { +func (fake *FakeConfigurationGetter) GetLatestConfiguration() []*dataplane.Configuration { fake.getLatestConfigurationMutex.Lock() ret, specificReturn := fake.getLatestConfigurationReturnsOnCall[len(fake.getLatestConfigurationArgsForCall)] fake.getLatestConfigurationArgsForCall = append(fake.getLatestConfigurationArgsForCall, struct { @@ -47,32 +47,32 @@ func (fake *FakeConfigurationGetter) GetLatestConfigurationCallCount() int { return len(fake.getLatestConfigurationArgsForCall) } -func (fake *FakeConfigurationGetter) GetLatestConfigurationCalls(stub func() *dataplane.Configuration) { +func (fake *FakeConfigurationGetter) GetLatestConfigurationCalls(stub func() []*dataplane.Configuration) { fake.getLatestConfigurationMutex.Lock() defer fake.getLatestConfigurationMutex.Unlock() fake.GetLatestConfigurationStub = stub } -func (fake *FakeConfigurationGetter) GetLatestConfigurationReturns(result1 *dataplane.Configuration) { +func (fake *FakeConfigurationGetter) GetLatestConfigurationReturns(result1 []*dataplane.Configuration) { fake.getLatestConfigurationMutex.Lock() defer fake.getLatestConfigurationMutex.Unlock() fake.GetLatestConfigurationStub = nil fake.getLatestConfigurationReturns = struct { - result1 *dataplane.Configuration + result1 []*dataplane.Configuration }{result1} } -func (fake *FakeConfigurationGetter) GetLatestConfigurationReturnsOnCall(i int, result1 *dataplane.Configuration) { +func (fake *FakeConfigurationGetter) GetLatestConfigurationReturnsOnCall(i int, result1 []*dataplane.Configuration) { fake.getLatestConfigurationMutex.Lock() defer fake.getLatestConfigurationMutex.Unlock() fake.GetLatestConfigurationStub = nil if fake.getLatestConfigurationReturnsOnCall == nil { fake.getLatestConfigurationReturnsOnCall = make(map[int]struct { - result1 *dataplane.Configuration + result1 []*dataplane.Configuration }) } fake.getLatestConfigurationReturnsOnCall[i] = struct { - result1 *dataplane.Configuration + result1 []*dataplane.Configuration }{result1} } diff --git a/tests/suite/client_settings_test.go b/tests/suite/client_settings_test.go index 7a77c0dea9..1f6293e6b5 100644 --- a/tests/suite/client_settings_test.go +++ b/tests/suite/client_settings_test.go @@ -104,6 +104,29 @@ var _ = Describe("ClientSettingsPolicy", Ordered, Label("functional", "cspolicy" } }) + Context("verify working traffic", func() { + It("should return a 200 response for HTTPRoutes", func() { + baseCoffeeURL := baseURL + "/coffee" + baseTeaURL := baseURL + "/tea" + + Eventually( + func() error { + return expectRequestToSucceed(baseCoffeeURL, address, "URI: /coffee") + }). + WithTimeout(timeoutConfig.RequestTimeout). + WithPolling(500 * time.Millisecond). + Should(Succeed()) + + Eventually( + func() error { + return expectRequestToSucceed(baseTeaURL, address, "URI: /tea") + }). + WithTimeout(timeoutConfig.RequestTimeout). + WithPolling(500 * time.Millisecond). + Should(Succeed()) + }) + }) + Context("nginx config", func() { var conf *framework.Payload filePrefix := fmt.Sprintf("/etc/nginx/includes/ClientSettingsPolicy_%s", namespace)