diff --git a/pkg/config/controller.go b/pkg/config/controller.go index a5655593ef..8920480319 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -60,6 +60,19 @@ type Controller struct { // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool + // EnableWarmup specifies whether the controller should start its sources when the manager is not + // the leader. This is useful for cases where sources take a long time to start, as it allows + // for the controller to warm up its caches even before it is elected as the leader. This + // improves leadership failover time, as the caches will be prepopulated before the controller + // transitions to be leader. + // + // Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its + // sources without waiting to become leader. + // Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without + // leader election do not wait on leader election to start their sources. + // Defaults to false. + EnableWarmup *bool + // UsePriorityQueue configures the controllers queue to use the controller-runtime provided // priority queue. // diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9de959b48f..03e51abcc5 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -93,6 +93,19 @@ type TypedOptions[request comparable] struct { // // Note: This flag is disabled by default until a future version. It's currently in beta. UsePriorityQueue *bool + + // EnableWarmup specifies whether the controller should start its sources when the manager is not + // the leader. This is useful for cases where sources take a long time to start, as it allows + // for the controller to warm up its caches even before it is elected as the leader. This + // improves leadership failover time, as the caches will be prepopulated before the controller + // transitions to be leader. + // + // Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its + // sources without waiting to become leader. + // Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without + // leader election do not wait on leader election to start their sources. + // Defaults to false. + EnableWarmup *bool } // DefaultFromConfig defaults the config from a config.Controller @@ -124,6 +137,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller if options.NeedLeaderElection == nil { options.NeedLeaderElection = config.NeedLeaderElection } + + if options.EnableWarmup == nil { + options.EnableWarmup = config.EnableWarmup + } } // Controller implements an API. A Controller manages a work queue fed reconcile.Requests @@ -243,7 +260,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req } // Create controller with dependencies set - return &controller.Controller[request]{ + return controller.New[request](controller.Options[request]{ Do: options.Reconciler, RateLimiter: options.RateLimiter, NewQueue: options.NewQueue, @@ -253,7 +270,8 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req LogConstructor: options.LogConstructor, RecoverPanic: options.RecoverPanic, LeaderElected: options.NeedLeaderElection, - }, nil + EnableWarmup: options.EnableWarmup, + }), nil } // ReconcileIDFromContext gets the reconcileID from the current context. diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index c8e8a790fc..0eb869f40f 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -18,16 +18,21 @@ package controller_test import ( "context" + "fmt" + "strconv" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -48,30 +53,46 @@ var _ = Describe("controller", func() { Describe("controller", func() { // TODO(directxman12): write a whole suite of controller-client interaction tests - It("should reconcile", func() { + // Since all tests are run in parallel and share the same testenv, we namespace the objects + // created by using a namespace per entry, and adding a watch predicate that filters by + // namespace. + DescribeTable("should reconcile", func(enableWarmup bool) { By("Creating the Manager") cm, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") - instance, err := controller.New("foo-controller", cm, controller.Options{ - Reconciler: reconcile.Func( - func(_ context.Context, request reconcile.Request) (reconcile.Result, error) { - reconciled <- request - return reconcile.Result{}, nil - }), - }) + instance, err := controller.New( + fmt.Sprintf("foo-controller-%t", enableWarmup), + cm, + controller.Options{ + Reconciler: reconcile.Func( + func(_ context.Context, request reconcile.Request) (reconcile.Result, error) { + reconciled <- request + return reconcile.Result{}, nil + }), + EnableWarmup: ptr.To(enableWarmup), + }, + ) Expect(err).NotTo(HaveOccurred()) + testNamespace := strconv.FormatBool(enableWarmup) + By("Watching Resources") err = instance.Watch( source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}, handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}), + makeNamespacePredicate[*appsv1.ReplicaSet](testNamespace), ), ) Expect(err).NotTo(HaveOccurred()) - err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{})) + err = instance.Watch( + source.Kind(cm.GetCache(), &appsv1.Deployment{}, + &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}, + makeNamespacePredicate[*appsv1.Deployment](testNamespace), + ), + ) Expect(err).NotTo(HaveOccurred()) err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) @@ -110,19 +131,25 @@ var _ = Describe("controller", func() { }, } expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{ - Namespace: "default", + Namespace: testNamespace, Name: "deployment-name", }} + By("Creating the test namespace") + _, err = clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: testNamespace}, + }, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + By("Invoking Reconciling for Create") - deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{}) + deployment, err = clientset.AppsV1().Deployments(testNamespace).Create(ctx, deployment, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) By("Invoking Reconciling for Update") newDeployment := deployment.DeepCopy() newDeployment.Labels = map[string]string{"foo": "bar"} - _, err = clientset.AppsV1().Deployments("default").Update(ctx, newDeployment, metav1.UpdateOptions{}) + _, err = clientset.AppsV1().Deployments(testNamespace).Update(ctx, newDeployment, metav1.UpdateOptions{}) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) @@ -145,24 +172,24 @@ var _ = Describe("controller", func() { Template: deployment.Spec.Template, }, } - replicaset, err = clientset.AppsV1().ReplicaSets("default").Create(ctx, replicaset, metav1.CreateOptions{}) + replicaset, err = clientset.AppsV1().ReplicaSets(testNamespace).Create(ctx, replicaset, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) By("Invoking Reconciling for an OwnedObject when it is updated") newReplicaset := replicaset.DeepCopy() newReplicaset.Labels = map[string]string{"foo": "bar"} - _, err = clientset.AppsV1().ReplicaSets("default").Update(ctx, newReplicaset, metav1.UpdateOptions{}) + _, err = clientset.AppsV1().ReplicaSets(testNamespace).Update(ctx, newReplicaset, metav1.UpdateOptions{}) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) By("Invoking Reconciling for an OwnedObject when it is deleted") - err = clientset.AppsV1().ReplicaSets("default").Delete(ctx, replicaset.Name, metav1.DeleteOptions{}) + err = clientset.AppsV1().ReplicaSets(testNamespace).Delete(ctx, replicaset.Name, metav1.DeleteOptions{}) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) By("Invoking Reconciling for Delete") - err = clientset.AppsV1().Deployments("default"). + err = clientset.AppsV1().Deployments(testNamespace). Delete(ctx, "deployment-name", metav1.DeleteOptions{}) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) @@ -174,7 +201,12 @@ var _ = Describe("controller", func() { By("Invoking Reconciling for a pod when it is created when adding watcher dynamically") // Add new watcher dynamically - err = instance.Watch(source.Kind(cm.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})) + err = instance.Watch( + source.Kind(cm.GetCache(), &corev1.Pod{}, + &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}, + makeNamespacePredicate[*corev1.Pod](testNamespace), + ), + ) Expect(err).NotTo(HaveOccurred()) pod := &corev1.Pod{ @@ -194,16 +226,27 @@ var _ = Describe("controller", func() { }, } expectedReconcileRequest = reconcile.Request{NamespacedName: types.NamespacedName{ - Namespace: "default", + Namespace: testNamespace, Name: "pod-name", }} - _, err = clientset.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{}) + _, err = clientset.CoreV1().Pods(testNamespace).Create(ctx, pod, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) - }) + }, + Entry("with controller warmup enabled", true), + Entry("with controller warmup not enabled", false), + ) }) }) +// makeNamespacePredicate returns a predicate that filters out all objects not in the passed in +// namespace. +func makeNamespacePredicate[object client.Object](namespace string) predicate.TypedPredicate[object] { + return predicate.NewTypedPredicateFuncs[object](func(obj object) bool { + return obj.GetNamespace() == namespace + }) +} + func truePtr() *bool { t := true return &t diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 1c5b11d709..244a9f17a4 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -474,5 +474,71 @@ var _ = Describe("controller.Controller", func() { _, ok = q.(priorityqueue.PriorityQueue[reconcile.Request]) Expect(ok).To(BeFalse()) }) + + It("should set EnableWarmup correctly", func() { + m, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + // Test with EnableWarmup set to true + ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + EnableWarmup: ptr.To(true), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithWarmup.EnableWarmup).To(HaveValue(BeTrue())) + + // Test with EnableWarmup set to false + ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + EnableWarmup: ptr.To(false), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithoutWarmup.EnableWarmup).To(HaveValue(BeFalse())) + + // Test with EnableWarmup not set (should default to nil) + ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithDefaultWarmup.EnableWarmup).To(BeNil()) + }) + + It("should inherit EnableWarmup from manager config", func() { + // Test with manager default setting EnableWarmup to true + managerWithWarmup, err := manager.New(cfg, manager.Options{ + Controller: config.Controller{ + EnableWarmup: ptr.To(true), + }, + }) + Expect(err).NotTo(HaveOccurred()) + ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{ + Reconciler: reconcile.Func(nil), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlInheritingWarmup.EnableWarmup).To(HaveValue(BeTrue())) + + // Test that explicit controller setting overrides manager setting + ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{ + Reconciler: reconcile.Func(nil), + EnableWarmup: ptr.To(false), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlOverridingWarmup.EnableWarmup).To(HaveValue(BeFalse())) + }) }) }) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 9fa7ec71e1..6475d60134 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -38,6 +38,51 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) +// Options are the arguments for creating a new Controller. +type Options[request comparable] struct { + // Reconciler is a function that can be called at any time with the Name / Namespace of an object and + // ensures that the state of the system matches the state specified in the object. + // Defaults to the DefaultReconcileFunc. + Do reconcile.TypedReconciler[request] + + // RateLimiter is used to limit how frequently requests may be queued into the work queue. + RateLimiter workqueue.TypedRateLimiter[request] + + // NewQueue constructs the queue for this controller once the controller is ready to start. + // This is a func because the standard Kubernetes work queues start themselves immediately, which + // leads to goroutine leaks if something calls controller.New repeatedly. + NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] + + // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. + MaxConcurrentReconciles int + + // CacheSyncTimeout refers to the time limit set on waiting for cache to sync + // Defaults to 2 minutes if not set. + CacheSyncTimeout time.Duration + + // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. + Name string + + // LogConstructor is used to construct a logger to then log messages to users during reconciliation, + // or for example when a watch is started. + // Note: LogConstructor has to be able to handle nil requests as we are also using it + // outside the context of a reconciliation. + LogConstructor func(request *request) logr.Logger + + // RecoverPanic indicates whether the panic caused by reconcile should be recovered. + // Defaults to true. + RecoverPanic *bool + + // LeaderElected indicates whether the controller is leader elected or always running. + LeaderElected *bool + + // EnableWarmup specifies whether the controller should start its sources + // when the manager is not the leader. + // Defaults to false, which means that the controller will wait for leader election to start + // before starting sources. + EnableWarmup *bool +} + // Controller implements controller.Controller. type Controller[request comparable] struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. @@ -83,6 +128,14 @@ type Controller[request comparable] struct { // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []source.TypedSource[request] + // startedEventSourcesAndQueue is used to track if the event sources have been started. + // It ensures that we append sources to c.startWatches only until we call Start() / Warmup() + // It is true if startEventSourcesAndQueueLocked has been called at least once. + startedEventSourcesAndQueue bool + + // didStartEventSourcesOnce is used to ensure that the event sources are only started once. + didStartEventSourcesOnce sync.Once + // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. // Note: LogConstructor has to be able to handle nil requests as we are also using it @@ -95,6 +148,35 @@ type Controller[request comparable] struct { // LeaderElected indicates whether the controller is leader elected or always running. LeaderElected *bool + + // EnableWarmup specifies whether the controller should start its sources when the manager is not + // the leader. This is useful for cases where sources take a long time to start, as it allows + // for the controller to warm up its caches even before it is elected as the leader. This + // improves leadership failover time, as the caches will be prepopulated before the controller + // transitions to be leader. + // + // Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its + // sources without waiting to become leader. + // Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without + // leader election do not wait on leader election to start their sources. + // Defaults to false. + EnableWarmup *bool +} + +// New returns a new Controller configured with the given options. +func New[request comparable](options Options[request]) *Controller[request] { + return &Controller[request]{ + Do: options.Do, + RateLimiter: options.RateLimiter, + NewQueue: options.NewQueue, + MaxConcurrentReconciles: options.MaxConcurrentReconciles, + CacheSyncTimeout: options.CacheSyncTimeout, + Name: options.Name, + LogConstructor: options.LogConstructor, + RecoverPanic: options.RecoverPanic, + LeaderElected: options.LeaderElected, + EnableWarmup: options.EnableWarmup, + } } // Reconcile implements reconcile.Reconciler. @@ -124,10 +206,9 @@ func (c *Controller[request]) Watch(src source.TypedSource[request]) error { c.mu.Lock() defer c.mu.Unlock() - // Controller hasn't started yet, store the watches locally and return. - // - // These watches are going to be held on the controller struct until the manager or user calls Start(...). - if !c.Started { + // Sources weren't started yet, store the watches locally and return. + // These sources are going to be held until either Warmup() or Start(...) is called. + if !c.startedEventSourcesAndQueue { c.startWatches = append(c.startWatches, src) return nil } @@ -144,6 +225,21 @@ func (c *Controller[request]) NeedLeaderElection() bool { return *c.LeaderElected } +// Warmup implements the manager.WarmupRunnable interface. +func (c *Controller[request]) Warmup(ctx context.Context) error { + if c.EnableWarmup == nil || !*c.EnableWarmup { + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Set the ctx so later calls to watch use this internal context + c.ctx = ctx + + return c.startEventSourcesAndQueueLocked(ctx) +} + // Start implements controller.Controller. func (c *Controller[request]) Start(ctx context.Context) error { // use an IIFE to get proper lock handling @@ -158,17 +254,6 @@ func (c *Controller[request]) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx - queue := c.NewQueue(c.Name, c.RateLimiter) - if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue { - c.Queue = priorityQueue - } else { - c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue} - } - go func() { - <-ctx.Done() - c.Queue.ShutDown() - }() - wg := &sync.WaitGroup{} err := func() error { defer c.mu.Unlock() @@ -179,18 +264,12 @@ func (c *Controller[request]) Start(ctx context.Context) error { // NB(directxman12): launch the sources *before* trying to wait for the // caches to sync so that they have a chance to register their intended // caches. - if err := c.startEventSources(ctx); err != nil { + if err := c.startEventSourcesAndQueueLocked(ctx); err != nil { return err } c.LogConstructor(nil).Info("Starting Controller") - // All the watches have been started, we can reset the local slice. - // - // We should never hold watches more than necessary, each watch source can hold a backing cache, - // which won't be garbage collected if we hold a reference to it. - c.startWatches = nil - // Launch workers to process resources c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles) wg.Add(c.MaxConcurrentReconciles) @@ -218,63 +297,94 @@ func (c *Controller[request]) Start(ctx context.Context) error { return nil } -// startEventSources launches all the sources registered with this controller and waits +// startEventSourcesAndQueueLocked launches all the sources registered with this controller and waits // for them to sync. It returns an error if any of the sources fail to start or sync. -func (c *Controller[request]) startEventSources(ctx context.Context) error { - errGroup := &errgroup.Group{} - for _, watch := range c.startWatches { - log := c.LogConstructor(nil) - _, ok := watch.(interface { - String() string - }) - - if !ok { - log = log.WithValues("source", fmt.Sprintf("%T", watch)) +func (c *Controller[request]) startEventSourcesAndQueueLocked(ctx context.Context) error { + var retErr error + + c.didStartEventSourcesOnce.Do(func() { + queue := c.NewQueue(c.Name, c.RateLimiter) + if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue { + c.Queue = priorityQueue } else { - log = log.WithValues("source", fmt.Sprintf("%s", watch)) + c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue} } - didStartSyncingSource := &atomic.Bool{} - errGroup.Go(func() error { - // Use a timeout for starting and syncing the source to avoid silently - // blocking startup indefinitely if it doesn't come up. - sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) - defer cancel() - - sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out - go func() { - defer close(sourceStartErrChan) - log.Info("Starting EventSource") - if err := watch.Start(ctx, c.Queue); err != nil { - sourceStartErrChan <- err - return - } - syncingSource, ok := watch.(source.TypedSyncingSource[request]) - if !ok { - return - } - didStartSyncingSource.Store(true) - if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { - err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) - log.Error(err, "Could not wait for Cache to sync") - sourceStartErrChan <- err + go func() { + <-ctx.Done() + c.Queue.ShutDown() + }() + + errGroup := &errgroup.Group{} + for _, watch := range c.startWatches { + log := c.LogConstructor(nil) + _, ok := watch.(interface { + String() string + }) + if !ok { + log = log.WithValues("source", fmt.Sprintf("%T", watch)) + } else { + log = log.WithValues("source", fmt.Sprintf("%s", watch)) + } + didStartSyncingSource := &atomic.Bool{} + errGroup.Go(func() error { + // Use a timeout for starting and syncing the source to avoid silently + // blocking startup indefinitely if it doesn't come up. + sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + + sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out + hasAccessedQueueChan := make(chan struct{}) // + go func() { + defer close(sourceStartErrChan) + log.Info("Starting EventSource") + + q := c.Queue + close(hasAccessedQueueChan) + if err := watch.Start(ctx, q); err != nil { + sourceStartErrChan <- err + return + } + syncingSource, ok := watch.(source.TypedSyncingSource[request]) + if !ok { + return + } + didStartSyncingSource.Store(true) + if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { + err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) + log.Error(err, "Could not wait for Cache to sync") + sourceStartErrChan <- err + } + }() + + select { + case err := <-sourceStartErrChan: + return err + case <-sourceStartCtx.Done(): + defer func() { <-hasAccessedQueueChan }() // Ensure that watch.Start has been called to avoid prematurely releasing lock before accessing c.Queue + if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened + return <-sourceStartErrChan + } + if ctx.Err() != nil { // Don't return an error if the root context got cancelled + return nil + } + return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) } - }() + }) + } + retErr = errGroup.Wait() - select { - case err := <-sourceStartErrChan: - return err - case <-sourceStartCtx.Done(): - if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened - return <-sourceStartErrChan - } - if ctx.Err() != nil { // Don't return an error if the root context got cancelled - return nil - } - return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) - } - }) - } - return errGroup.Wait() + // All the watches have been started, we can reset the local slice. + // + // We should never hold watches more than necessary, each watch source can hold a backing cache, + // which won't be garbage collected if we hold a reference to it. + c.startWatches = nil + + // Mark event sources as started after resetting the startWatches slice so that watches from + // a new Watch() call are immediately started. + c.startedEventSourcesAndQueue = true + }) + + return retErr } // processNextWorkItem will read a single work item off the workqueue and diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 3fde5da9c8..2c3a399d7d 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -28,6 +29,7 @@ import ( . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "go.uber.org/goleak" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,10 +41,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" + "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" "sigs.k8s.io/controller-runtime/pkg/internal/log" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -69,7 +73,7 @@ var _ = Describe("controller", func() { queue = &controllertest.Queue{ TypedInterface: workqueue.NewTyped[reconcile.Request](), } - ctrl = &Controller[reconcile.Request]{ + ctrl = New[reconcile.Request](Options[reconcile.Request]{ MaxConcurrentReconciles: 1, Do: fakeReconcile, NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { @@ -78,7 +82,7 @@ var _ = Describe("controller", func() { LogConstructor: func(_ *reconcile.Request) logr.Logger { return log.RuntimeLog.WithName("controller").WithName("test") }, - } + }) }) Describe("Reconciler", func() { @@ -350,14 +354,14 @@ var _ = Describe("controller", func() { TypedRateLimitingInterface: &controllertest.TypedQueue[TestRequest]{ TypedInterface: workqueue.NewTyped[TestRequest](), }} - ctrl := &Controller[TestRequest]{ + ctrl := New[TestRequest](Options[TestRequest]{ NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] { return queue }, LogConstructor: func(*TestRequest) logr.Logger { return log.RuntimeLog.WithName("controller").WithName("test") }, - } + }) ctrl.CacheSyncTimeout = time.Second src := &bisignallingSource[TestRequest]{ startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]), @@ -383,14 +387,24 @@ var _ = Describe("controller", func() { }) }) - Describe("startEventSources", func() { + Describe("startEventSourcesAndQueueLocked", func() { It("should return nil when no sources are provided", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ctrl.startWatches = []source.TypedSource[reconcile.Request]{} - err := ctrl.startEventSources(ctx) + err := ctrl.startEventSourcesAndQueueLocked(ctx) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should initialize controller queue when called", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{} + err := ctrl.startEventSourcesAndQueueLocked(ctx) Expect(err).NotTo(HaveOccurred()) + Expect(ctrl.Queue).NotTo(BeNil()) }) It("should return an error if a source fails to start", func() { @@ -403,10 +417,10 @@ var _ = Describe("controller", func() { return expectedErr }) - // // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned + // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned ctrl.CacheSyncTimeout = 5 * time.Second ctrl.startWatches = []source.TypedSource[reconcile.Request]{src} - err := ctrl.startEventSources(ctx) + err := ctrl.startEventSourcesAndQueueLocked(ctx) Expect(err).To(Equal(expectedErr)) }) @@ -420,7 +434,7 @@ var _ = Describe("controller", func() { ctrl.Name = "test-controller" ctrl.CacheSyncTimeout = 5 * time.Second - err := ctrl.startEventSources(ctx) + err := ctrl.startEventSourcesAndQueueLocked(ctx) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("failed to wait for test-controller caches to sync")) }) @@ -436,7 +450,7 @@ var _ = Describe("controller", func() { ctrl.Name = "test-controller" ctrl.CacheSyncTimeout = 5 * time.Second - err := ctrl.startEventSources(ctx) + err := ctrl.startEventSourcesAndQueueLocked(ctx) Expect(err).NotTo(HaveOccurred()) }) @@ -459,7 +473,8 @@ var _ = Describe("controller", func() { // Start the sources in a goroutine startErrCh := make(chan error) go func() { - startErrCh <- ctrl.startEventSources(sourceCtx) + defer GinkgoRecover() + startErrCh <- ctrl.startEventSourcesAndQueueLocked(sourceCtx) }() // Allow source to start successfully @@ -494,10 +509,103 @@ var _ = Describe("controller", func() { ctrl.startWatches = []source.TypedSource[reconcile.Request]{blockingSrc} - err := ctrl.startEventSources(ctx) + err := ctrl.startEventSourcesAndQueueLocked(ctx) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("timed out waiting for source")) }) + + It("should only start sources once when called multiple times concurrently", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.CacheSyncTimeout = 1 * time.Millisecond + + var startCount atomic.Int32 + src := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + startCount.Add(1) + return nil + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{src} + + By("Calling startEventSourcesAndQueueLocked multiple times in parallel") + var wg sync.WaitGroup + for i := 1; i <= 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := ctrl.startEventSourcesAndQueueLocked(ctx) + // All calls should return the same nil error + Expect(err).NotTo(HaveOccurred()) + }() + } + + wg.Wait() + Expect(startCount.Load()).To(Equal(int32(1)), "Source should only be started once even when called multiple times") + }) + + It("should block subsequent calls from returning until the first call to startEventSourcesAndQueueLocked has returned", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl.CacheSyncTimeout = 5 * time.Second + + // finishSourceChan is closed to unblock startEventSourcesAndQueueLocked from returning + finishSourceChan := make(chan struct{}) + + src := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + <-finishSourceChan + return nil + }) + ctrl.startWatches = []source.TypedSource[reconcile.Request]{src} + + By("Calling startEventSourcesAndQueueLocked asynchronously") + wg := sync.WaitGroup{} + go func() { + defer GinkgoRecover() + defer wg.Done() + + wg.Add(1) + Expect(ctrl.startEventSourcesAndQueueLocked(ctx)).To(Succeed()) + }() + + By("Calling startEventSourcesAndQueueLocked again") + var didSubsequentCallComplete atomic.Bool + go func() { + defer GinkgoRecover() + defer wg.Done() + + wg.Add(1) + Expect(ctrl.startEventSourcesAndQueueLocked(ctx)).To(Succeed()) + didSubsequentCallComplete.Store(true) + }() + + // Assert that second call to startEventSourcesAndQueueLocked is blocked while source has not finished + Consistently(didSubsequentCallComplete.Load).Should(BeFalse()) + + By("Finishing source start + sync") + finishSourceChan <- struct{}{} + + // Assert that second call to startEventSourcesAndQueueLocked is now complete + Eventually(didSubsequentCallComplete.Load).Should(BeTrue(), "startEventSourcesAndQueueLocked should complete after source is started and synced") + wg.Wait() + }) + + It("should reset c.startWatches to nil after returning", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.CacheSyncTimeout = 1 * time.Millisecond + + src := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + return nil + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{src} + + err := ctrl.startEventSourcesAndQueueLocked(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(ctrl.startWatches).To(BeNil(), "startWatches should be reset to nil after returning") + }) }) Describe("Processing queue items from a Controller", func() { @@ -1014,6 +1122,331 @@ var _ = Describe("controller", func() { }) }) }) + + Describe("Warmup", func() { + JustBeforeEach(func() { + ctrl.EnableWarmup = ptr.To(true) + }) + + It("should track warmup status correctly with successful sync", func() { + // Setup controller with sources that complete successfully + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.CacheSyncTimeout = time.Second + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + return nil + }), + } + + Expect(ctrl.Warmup(ctx)).To(Succeed()) + }) + + It("should track warmup status correctly with unsuccessful sync", func() { + // Setup controller with sources that complete with error + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.CacheSyncTimeout = time.Second + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + return errors.New("sync error") + }), + } + + err := ctrl.Warmup(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sync error")) + }) + + It("should call Start on sources with the appropriate non-nil queue", func() { + ctrl.CacheSyncTimeout = 10 * time.Second + started := false + ctx, cancel := context.WithCancel(context.Background()) + src := source.Func(func(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + defer GinkgoRecover() + Expect(q).ToNot(BeNil()) + Expect(q).To(Equal(ctrl.Queue)) + + started = true + cancel() // Cancel the context so ctrl.Start() doesn't block forever + return nil + }) + Expect(ctrl.Watch(src)).To(Succeed()) + Expect(ctrl.Warmup(ctx)).To(Succeed()) + Expect(ctrl.Queue).ToNot(BeNil()) + Expect(started).To(BeTrue()) + }) + + It("should return true if context is cancelled while waiting for source to start", func() { + // Setup controller with sources that complete with error + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.CacheSyncTimeout = time.Second + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + <-ctx.Done() + return nil + }), + } + + // channel to prevent the goroutine from outliving the It test + waitChan := make(chan struct{}) + + // Invoked in a goroutine because Warmup will block + go func() { + defer GinkgoRecover() + defer close(waitChan) + Expect(ctrl.Warmup(ctx)).To(Succeed()) + }() + + cancel() + <-waitChan + }) + + It("should be called before leader election runnables if warmup is enabled", func() { + // This unit test exists to ensure that a warmup enabled controller will actually be + // called in the warmup phase before the leader election runnables are started. It + // catches regressions in the controller that would not implement warmupRunnable from + // pkg/manager. + ctx, cancel := context.WithCancel(context.Background()) + + By("Creating a channel to track execution order") + runnableExecutionOrderChan := make(chan string, 2) + const nonWarmupRunnableName = "nonWarmupRunnable" + const warmupRunnableName = "warmupRunnable" + + ctrl.CacheSyncTimeout = time.Second + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + runnableExecutionOrderChan <- warmupRunnableName + return nil + }), + } + + nonWarmupCtrl := New[reconcile.Request](Options[reconcile.Request]{ + MaxConcurrentReconciles: 1, + Do: fakeReconcile, + NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return queue + }, + LogConstructor: func(_ *reconcile.Request) logr.Logger { + return log.RuntimeLog.WithName("controller").WithName("test") + }, + CacheSyncTimeout: time.Second, + EnableWarmup: ptr.To(false), + LeaderElected: ptr.To(true), + }) + nonWarmupCtrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + runnableExecutionOrderChan <- nonWarmupRunnableName + return nil + }), + } + + By("Creating a manager") + testenv = &envtest.Environment{} + cfg, err := testenv.Start() + Expect(err).NotTo(HaveOccurred()) + m, err := manager.New(cfg, manager.Options{ + LeaderElection: true, + LeaderElectionID: "some-leader-election-id", + LeaderElectionNamespace: "default", + }) + Expect(err).NotTo(HaveOccurred()) + + By("Adding warmup and non-warmup controllers to the manager") + Expect(m.Add(ctrl)).To(Succeed()) + Expect(m.Add(nonWarmupCtrl)).To(Succeed()) + + By("Starting the manager") + waitChan := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(waitChan) + Expect(m.Start(ctx)).To(Succeed()) + }() + + <-m.Elected() + Expect(<-runnableExecutionOrderChan).To(Equal(warmupRunnableName)) + Expect(<-runnableExecutionOrderChan).To(Equal(nonWarmupRunnableName)) + + cancel() + <-waitChan + }) + + It("should not cause a data race when called concurrently", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.CacheSyncTimeout = time.Second + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + return nil + }), + } + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer GinkgoRecover() + defer wg.Done() + Expect(ctrl.Warmup(ctx)).To(Succeed()) + }() + } + + wg.Wait() + }) + + It("should not cause a data race when called concurrently with Start and only start sources once", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.CacheSyncTimeout = time.Second + + var watchStartedCount atomic.Int32 + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + watchStartedCount.Add(1) + return nil + }), + } + + By("calling Warmup and Start concurrently") + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).To(Succeed()) + }() + + blockOnWarmupChan := make(chan struct{}) + go func() { + defer GinkgoRecover() + Expect(ctrl.Warmup(ctx)).To(Succeed()) + close(blockOnWarmupChan) + }() + + <-blockOnWarmupChan + + Expect(watchStartedCount.Load()).To(Equal(int32(1)), "source should only be started once") + Expect(ctrl.startWatches).To(BeNil(), "startWatches should be reset to nil after they are started") + }) + + It("should start sources added after Warmup is called", func() { + ctx, cancel := context.WithCancel(context.Background()) + + ctrl.CacheSyncTimeout = time.Second + + Expect(ctrl.Warmup(ctx)).To(Succeed()) + + By("starting a watch after warmup is added") + var didWatchStart atomic.Bool + Expect(ctrl.Watch(source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + didWatchStart.Store(true) + return nil + }))).To(Succeed()) + + waitChan := make(chan struct{}) + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).To(Succeed()) + close(waitChan) + }() + + Eventually(didWatchStart.Load).Should(BeTrue(), "watch should be started if it is added after Warmup") + + cancel() + <-waitChan + }) + + DescribeTable("should not leak goroutines when manager is stopped with warmup runnable", + func(leaderElection bool) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl.CacheSyncTimeout = time.Second + + By("Creating a manager") + testenv = &envtest.Environment{} + cfg, err := testenv.Start() + Expect(err).NotTo(HaveOccurred()) + m, err := manager.New(cfg, manager.Options{ + LeaderElection: leaderElection, + LeaderElectionID: "some-leader-election-id", + LeaderElectionNamespace: "default", + }) + Expect(err).NotTo(HaveOccurred()) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + <-ctx.Done() + return nil + }), + } + Expect(m.Add(ctrl)).To(Succeed()) + + // ignore needs to go after the testenv.Start() call to ignore the apiserver + // process + currentGRs := goleak.IgnoreCurrent() + waitChan := make(chan struct{}) + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).To(Succeed()) + close(waitChan) + }() + + <-m.Elected() + By("stopping the manager via context") + cancel() + + Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) + <-waitChan + }, + Entry("and with leader election enabled", true), + Entry("and without leader election enabled", false), + ) + }) + + Describe("Warmup with warmup disabled", func() { + JustBeforeEach(func() { + ctrl.EnableWarmup = ptr.To(false) + }) + + It("should not start sources when Warmup is called if warmup is disabled but start it when Start is called.", func() { + // Setup controller with sources that complete successfully + ctx, cancel := context.WithCancel(context.Background()) + + ctrl.CacheSyncTimeout = time.Second + var isSourceStarted atomic.Bool + isSourceStarted.Store(false) + ctrl.startWatches = []source.TypedSource[reconcile.Request]{ + source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + isSourceStarted.Store(true) + return nil + }), + } + + By("Calling Warmup when EnableWarmup is false") + err := ctrl.Warmup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(isSourceStarted.Load()).To(BeFalse()) + + By("Calling Start when EnableWarmup is false") + waitChan := make(chan struct{}) + + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).To(Succeed()) + close(waitChan) + }() + Eventually(isSourceStarted.Load).Should(BeTrue()) + cancel() + <-waitChan + }) + }) }) var _ = Describe("ReconcileIDFromContext function", func() { diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index e5204a7506..a9f91cbdd5 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { return fmt.Errorf("failed to start other runnables: %w", err) } + // Start WarmupRunnables and wait for warmup to complete. + if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil { + return fmt.Errorf("failed to start warmup runnables: %w", err) + } + // Start the leader election and all required runnables. { ctx, cancel := context.WithCancel(context.Background()) @@ -534,6 +539,18 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e }() go func() { + go func() { + // Stop the warmup runnables in a separate goroutine to avoid blocking. + // It is important to stop the warmup runnables in parallel with the other runnables + // since we cannot assume ordering of whether or not one of the warmup runnables or one + // of the other runnables is holding a lock. + // Cancelling the wrong runnable (one that is not holding the lock) will cause the + // shutdown sequence to block indefinitely as it will wait for the runnable that is + // holding the lock to finish. + cm.logger.Info("Stopping and waiting for warmup runnables") + cm.runnables.Warmup.StopAndWait(cm.shutdownCtx) + }() + // First stop the non-leader election runnables. cm.logger.Info("Stopping and waiting for non leader election runnables") cm.runnables.Others.StopAndWait(cm.shutdownCtx) diff --git a/pkg/manager/internal/integration/manager_test.go b/pkg/manager/internal/integration/manager_test.go index 346daa1e68..c83eead3c1 100644 --- a/pkg/manager/internal/integration/manager_test.go +++ b/pkg/manager/internal/integration/manager_test.go @@ -34,10 +34,12 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -90,6 +92,8 @@ var ( }, }, } + + ctx = ctrl.SetupSignalHandler() ) var _ = Describe("manger.Manager Start", func() { @@ -107,9 +111,7 @@ var _ = Describe("manger.Manager Start", func() { // * Add an index on v2 Driver to ensure we start and wait for an informer during cache.Start (as part of manager.Start) // * Note: cache.Start would fail if the conversion webhook doesn't work (which in turn depends on the readiness probe) // * Note: Adding the index for v2 ensures the Driver list call during Informer sync goes through conversion. - It("should start all components without deadlock", func() { - ctx := ctrl.SetupSignalHandler() - + DescribeTable("should start all components without deadlock", func(warmupEnabled bool) { // Set up schema. Expect(clientgoscheme.AddToScheme(scheme)).To(Succeed()) Expect(apiextensionsv1.AddToScheme(scheme)).To(Succeed()) @@ -163,7 +165,13 @@ var _ = Describe("manger.Manager Start", func() { driverReconciler := &DriverReconciler{ Client: mgr.GetClient(), } - Expect(ctrl.NewControllerManagedBy(mgr).For(&crewv2.Driver{}).Complete(driverReconciler)).To(Succeed()) + Expect( + ctrl.NewControllerManagedBy(mgr). + For(&crewv2.Driver{}). + Named(fmt.Sprintf("driver_warmup_%t", warmupEnabled)). + WithOptions(controller.Options{EnableWarmup: ptr.To(warmupEnabled)}). + Complete(driverReconciler), + ).To(Succeed()) // Set up a conversion webhook. conversionWebhook := createConversionWebhook(mgr) @@ -211,7 +219,10 @@ var _ = Describe("manger.Manager Start", func() { // Shutdown the server cancel() - }) + }, + Entry("controller warmup enabled", true), + Entry("controller warmup not enabled", false), + ) }) type DriverReconciler struct { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c3ae317b04..a11ee07367 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -314,6 +314,15 @@ type LeaderElectionRunnable interface { NeedLeaderElection() bool } +// warmupRunnable knows if a Runnable requires warmup. A warmup runnable is a runnable +// that should be run when the manager is started but before it becomes leader. +// Note: Implementing this interface is only useful when LeaderElection can be enabled, as the +// behavior when leaderelection is not enabled is to run LeaderElectionRunnables immediately. +type warmupRunnable interface { + // Warmup will be called when the manager is started but before it becomes leader. + Warmup(context.Context) error +} + // New returns a new Manager for creating Controllers. // Note that if ContentType in the given config is not set, "application/vnd.kubernetes.protobuf" // will be used for all built-in resources of Kubernetes, and "application/json" is for other types diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 247a33f9dc..2af8ce93e4 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1928,6 +1928,64 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) Expect(m.GetAPIReader()).NotTo(BeNil()) }) + + It("should run warmup runnables before leader election is won", func() { + By("Creating a channel to track execution order") + runnableExecutionOrderChan := make(chan string, 2) + const leaderElectionRunnableName = "leaderElectionRunnable" + const warmupRunnableName = "warmupRunnable" + + By("Creating a manager with leader election enabled") + m, err := New(cfg, Options{ + LeaderElection: true, + LeaderElectionNamespace: "default", + LeaderElectionID: "test-leader-election-warmup", + newResourceLock: fakeleaderelection.NewResourceLock, + HealthProbeBindAddress: "0", + Metrics: metricsserver.Options{BindAddress: "0"}, + PprofBindAddress: "0", + }) + Expect(err).NotTo(HaveOccurred()) + + By("Creating a runnable that implements WarmupRunnable interface") + // Create a warmup runnable + warmupRunnable := newWarmupRunnableFunc( + func(ctx context.Context) error { + // This is the leader election runnable that will be executed after leader election + // It will block until context is done/cancelled + <-ctx.Done() + return nil + }, + func(ctx context.Context) error { + // This should be called during startup before leader election + runnableExecutionOrderChan <- warmupRunnableName + return nil + }, + ) + Expect(m.Add(warmupRunnable)).To(Succeed()) + + By("Creating a runnable that requires leader election") + leaderElectionRunnable := RunnableFunc( + func(ctx context.Context) error { + runnableExecutionOrderChan <- leaderElectionRunnableName + <-ctx.Done() + return nil + }, + ) + Expect(m.Add(leaderElectionRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).To(Succeed()) + }() + + <-m.Elected() + Expect(<-runnableExecutionOrderChan).To(Equal(warmupRunnableName)) + Expect(<-runnableExecutionOrderChan).To(Equal(leaderElectionRunnableName)) + }) }) type runnableError struct { diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index db5cda7c88..082b502f5d 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -32,6 +32,7 @@ type runnables struct { Webhooks *runnableGroup Caches *runnableGroup LeaderElection *runnableGroup + Warmup *runnableGroup Others *runnableGroup } @@ -42,6 +43,7 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { Webhooks: newRunnableGroup(baseContext, errChan), Caches: newRunnableGroup(baseContext, errChan), LeaderElection: newRunnableGroup(baseContext, errChan), + Warmup: newRunnableGroup(baseContext, errChan), Others: newRunnableGroup(baseContext, errChan), } } @@ -65,8 +67,20 @@ func (r *runnables) Add(fn Runnable) error { }) case webhook.Server: return r.Webhooks.Add(fn, nil) - case LeaderElectionRunnable: - if !runnable.NeedLeaderElection() { + case warmupRunnable, LeaderElectionRunnable: + if warmupRunnable, ok := fn.(warmupRunnable); ok { + if err := r.Warmup.Add(RunnableFunc(warmupRunnable.Warmup), nil); err != nil { + return err + } + } + + leaderElectionRunnable, ok := fn.(LeaderElectionRunnable) + if !ok { + // If the runnable is not a LeaderElectionRunnable, add it to the leader election group for backwards compatibility + return r.LeaderElection.Add(fn, nil) + } + + if !leaderElectionRunnable.NeedLeaderElection() { return r.Others.Add(fn, nil) } return r.LeaderElection.Add(fn, nil) diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index f2f4119ba6..52086047af 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -27,6 +27,7 @@ var _ = Describe("runnables", func() { r := newRunnables(defaultBaseContext, errCh) Expect(r.Add(server)).To(Succeed()) Expect(r.HTTPServers.startQueue).To(HaveLen(1)) + Expect(r.Others.startQueue).To(BeEmpty()) }) It("should add caches to the appropriate group", func() { @@ -34,6 +35,7 @@ var _ = Describe("runnables", func() { r := newRunnables(defaultBaseContext, errCh) Expect(r.Add(cache)).To(Succeed()) Expect(r.Caches.startQueue).To(HaveLen(1)) + Expect(r.Others.startQueue).To(BeEmpty()) }) It("should add webhooks to the appropriate group", func() { @@ -41,6 +43,7 @@ var _ = Describe("runnables", func() { r := newRunnables(defaultBaseContext, errCh) Expect(r.Add(webhook)).To(Succeed()) Expect(r.Webhooks.startQueue).To(HaveLen(1)) + Expect(r.Others.startQueue).To(BeEmpty()) }) It("should add any runnable to the leader election group", func() { @@ -52,6 +55,121 @@ var _ = Describe("runnables", func() { r := newRunnables(defaultBaseContext, errCh) Expect(r.Add(runnable)).To(Succeed()) Expect(r.LeaderElection.startQueue).To(HaveLen(1)) + Expect(r.Others.startQueue).To(BeEmpty()) + }) + + It("should add WarmupRunnable to the Warmup and LeaderElection group", func() { + warmupRunnable := newWarmupRunnableFunc( + func(c context.Context) error { + <-c.Done() + return nil + }, + func(c context.Context) error { return nil }, + ) + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(HaveLen(1)) + Expect(r.Others.startQueue).To(BeEmpty()) + }) + + It("should add WarmupRunnable that doesn't needs leader election to warmup group only", func() { + warmupRunnable := newLeaderElectionAndWarmupRunnable( + func(c context.Context) error { + <-c.Done() + return nil + }, + func(c context.Context) error { return nil }, + false, + ) + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(BeEmpty()) + Expect(r.Others.startQueue).To(HaveLen(1)) + }) + + It("should add WarmupRunnable that needs leader election to Warmup and LeaderElection group, not Others", func() { + warmupRunnable := newLeaderElectionAndWarmupRunnable( + func(c context.Context) error { + <-c.Done() + return nil + }, + func(c context.Context) error { return nil }, + true, + ) + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(HaveLen(1)) + Expect(r.Others.startQueue).To(BeEmpty()) + }) + + It("should execute the Warmup function when Warmup group is started", func() { + var warmupExecuted atomic.Bool + + warmupRunnable := newWarmupRunnableFunc( + func(c context.Context) error { + <-c.Done() + return nil + }, + func(c context.Context) error { + warmupExecuted.Store(true) + return nil + }, + ) + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the Warmup group + Expect(r.Warmup.Start(ctx)).To(Succeed()) + + // Verify warmup function was called + Expect(warmupExecuted.Load()).To(BeTrue()) + }) + + It("should propagate errors from Warmup function to error channel", func() { + expectedErr := fmt.Errorf("expected warmup error") + + warmupRunnable := newWarmupRunnableFunc( + func(c context.Context) error { + <-c.Done() + return nil + }, + func(c context.Context) error { return expectedErr }, + ) + + testErrChan := make(chan error, 1) + r := newRunnables(defaultBaseContext, testErrChan) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the Warmup group in a goroutine + go func() { + Expect(r.Warmup.Start(ctx)).To(Succeed()) + }() + + // Error from Warmup should be sent to error channel + var receivedErr error + Eventually(func() error { + select { + case receivedErr = <-testErrChan: + return receivedErr + default: + return nil + } + }).Should(Equal(expectedErr)) }) }) @@ -224,3 +342,57 @@ var _ = Describe("runnableGroup", func() { } }) }) + +var _ warmupRunnable = &warmupRunnableFunc{} + +func newWarmupRunnableFunc( + startFunc func(context.Context) error, + warmupFunc func(context.Context) error, +) *warmupRunnableFunc { + return &warmupRunnableFunc{ + startFunc: startFunc, + warmupFunc: warmupFunc, + } +} + +// warmupRunnableFunc is a helper struct that implements WarmupRunnable +// for testing purposes. +type warmupRunnableFunc struct { + startFunc func(context.Context) error + warmupFunc func(context.Context) error +} + +func (r *warmupRunnableFunc) Start(ctx context.Context) error { + return r.startFunc(ctx) +} + +func (r *warmupRunnableFunc) Warmup(ctx context.Context) error { + return r.warmupFunc(ctx) +} + +var _ LeaderElectionRunnable = &leaderElectionAndWarmupRunnable{} +var _ warmupRunnable = &leaderElectionAndWarmupRunnable{} + +// leaderElectionAndWarmupRunnable implements both WarmupRunnable and LeaderElectionRunnable +type leaderElectionAndWarmupRunnable struct { + *warmupRunnableFunc + needLeaderElection bool +} + +func newLeaderElectionAndWarmupRunnable( + startFunc func(context.Context) error, + warmupFunc func(context.Context) error, + needLeaderElection bool, +) *leaderElectionAndWarmupRunnable { + return &leaderElectionAndWarmupRunnable{ + warmupRunnableFunc: &warmupRunnableFunc{ + startFunc: startFunc, + warmupFunc: warmupFunc, + }, + needLeaderElection: needLeaderElection, + } +} + +func (r leaderElectionAndWarmupRunnable) NeedLeaderElection() bool { + return r.needLeaderElection +}