diff --git a/controller/controller.go b/controller/controller.go index 67512394..daa24a33 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -5,25 +5,21 @@ import ( "fmt" "io/ioutil" "os" + "reflect" "regexp" "strings" "sync" "time" - "golang.org/x/time/rate" - - // objectstorage v1alpha1 "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1" bucketclientset "sigs.k8s.io/container-object-storage-interface-api/clientset" - // k8s api v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - - // k8s client kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -35,10 +31,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - // logging "github.com/golang/glog" - - // config "github.com/spf13/viper" ) @@ -49,6 +42,7 @@ type deleteFunc func(ctx context.Context, obj interface{}) error type addOp struct { Object interface{} AddFunc *addFunc + Indexer cache.Indexer Key string } @@ -61,6 +55,7 @@ type updateOp struct { OldObject interface{} NewObject interface{} UpdateFunc *updateFunc + Indexer cache.Indexer Key string } @@ -72,6 +67,7 @@ func (u updateOp) String() string { type deleteOp struct { Object interface{} DeleteFunc *deleteFunc + Indexer cache.Indexer Key string } @@ -105,15 +101,13 @@ type ObjectStorageController struct { bucketClient bucketclientset.Interface kubeClient kubeclientset.Interface - locker map[string]*sync.Mutex lockerLock sync.Mutex + locker map[types.UID]*sync.Mutex + opMap map[types.UID]interface{} } func NewDefaultObjectStorageController(identity string, leaderLockName string, threads int) (*ObjectStorageController, error) { - rateLimit := workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 600*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ) + rateLimit := workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 30*time.Second) return NewObjectStorageController(identity, leaderLockName, threads, rateLimit) } @@ -162,10 +156,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str queue: workqueue.NewRateLimitingQueue(limiter), threadiness: threads, - ResyncPeriod: 30 * time.Second, - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, + ResyncPeriod: 30 * time.Second, + // leader election + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, RetryPeriod: 5 * time.Second, + + opMap: map[types.UID]interface{}{}, }, nil } @@ -219,10 +216,11 @@ func (c *ObjectStorageController) Run(ctx context.Context) error { } leaderConfig := leaderelection.LeaderElectionConfig{ - Lock: l, - LeaseDuration: c.LeaseDuration, - RenewDeadline: c.RenewDeadline, - RetryPeriod: c.RetryPeriod, + Lock: l, + ReleaseOnCancel: true, + LeaseDuration: c.LeaseDuration, + RenewDeadline: c.RenewDeadline, + RetryPeriod: c.RetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { glog.V(2).Info("became leader, starting") @@ -248,75 +246,85 @@ func (c *ObjectStorageController) runWorker(ctx context.Context) { func (c *ObjectStorageController) processNextItem(ctx context.Context) bool { // Wait until there is a new item in the working queue - op, quit := c.queue.Get() + uuidInterface, quit := c.queue.Get() if quit { return false } + uuid := uuidInterface.(types.UID) + var err error // With the lock below in place, we can safely tell the queue that we are done // processing this item. The lock will ensure that multiple items of the same // name and kind do not get processed simultaneously - defer c.queue.Done(op) + defer c.queue.Done(uuid) // Ensure that multiple operations on different versions of the same object // do not happen in parallel - c.OpLock(op) - defer c.OpUnlock(op) + c.OpLock(uuid) + defer c.OpUnlock(uuid) + + op := c.opMap[uuid] - var err error switch o := op.(type) { case addOp: add := *o.AddFunc + objMeta := o.Object.(metav1.Object) + name := objMeta.GetName() + ns := objMeta.GetNamespace() err = add(ctx, o.Object) + if err == nil { + o.Indexer.Add(o.Object) + } else { + glog.Errorf("Error adding %s %s: %v", ns, name, err) + } case updateOp: update := *o.UpdateFunc + objMeta := o.OldObject.(metav1.Object) + name := objMeta.GetName() + ns := objMeta.GetNamespace() err = update(ctx, o.OldObject, o.NewObject) + if err == nil { + o.Indexer.Update(o.NewObject) + } else { + glog.Errorf("Error updating %s %s: %v", ns, name, err) + } case deleteOp: delete := *o.DeleteFunc + objMeta := o.Object.(metav1.Object) + name := objMeta.GetName() + ns := objMeta.GetNamespace() err = delete(ctx, o.Object) + if err == nil { + o.Indexer.Delete(o.Object) + } else { + glog.Errorf("Error deleting %s %s: %v", ns, name, err) + } default: panic("unknown item in queue") } // Handle the error if something went wrong - c.handleErr(err, op) + c.handleErr(err, uuid) return true } -func (c *ObjectStorageController) OpLock(op interface{}) { +func (c *ObjectStorageController) OpLock(op types.UID) { c.GetOpLock(op).Lock() } -func (c *ObjectStorageController) OpUnlock(op interface{}) { +func (c *ObjectStorageController) OpUnlock(op types.UID) { c.GetOpLock(op).Unlock() } -func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex { - var key string - var ext string - - switch o := op.(type) { - case addOp: - key = o.Key - ext = fmt.Sprintf("%v", o.AddFunc) - case updateOp: - key = o.Key - ext = fmt.Sprintf("%v", o.UpdateFunc) - case deleteOp: - key = o.Key - ext = fmt.Sprintf("%v", o.DeleteFunc) - default: - panic("unknown item in queue") - } +func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex { + lockKey := op + c.lockerLock.Lock() + defer c.lockerLock.Unlock() - lockKey := fmt.Sprintf("%s/%s", key, ext) if c.locker == nil { - c.locker = map[string]*sync.Mutex{} + c.locker = map[types.UID]*sync.Mutex{} } - c.lockerLock.Lock() - defer c.lockerLock.Unlock() - if _, ok := c.locker[lockKey]; !ok { c.locker[lockKey] = &sync.Mutex{} } @@ -326,36 +334,15 @@ func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex { // handleErr checks if an error happened and makes sure we will retry later. func (c *ObjectStorageController) handleErr(err error, op interface{}) { if err == nil { - // Forget about the #AddRateLimited history of the op on every successful synchronization. - // This ensures that future processing of updates for this op is not delayed because of - // an outdated error history. c.queue.Forget(op) return } - - /* TODO: Determine if there is a maxium number of retries or time allowed before giving up - // This controller retries 5 times if something goes wrong. After that, it stops trying. - if c.queue.NumRequeues(op) < 5 { - klog.Infof("Error syncing op %v: %v", key, err) - - // Re-enqueue the key rate limited. Based on the rate limiter on the - // queue and the re-enqueue history, the op will be processed later again. - c.queue.AddRateLimited(op) - return - } - - c.queue.Forget(key) - // Report to an external entity that, even after several retries, we could not successfully process this op - utilruntime.HandleError(err) - klog.Infof("Dropping op %+v out of the queue: %v", op, err) - */ - glog.V(5).Infof("Error executing operation %+v: %+v", op, err) c.queue.AddRateLimited(op) } func (c *ObjectStorageController) runController(ctx context.Context) { controllerFor := func(name string, objType runtime.Object, add addFunc, update updateFunc, delete deleteFunc) { - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + indexer := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) resyncPeriod := c.ResyncPeriod lw := cache.NewListWatchFromClient(c.bucketClient.ObjectstorageV1alpha1().RESTClient(), name, "", fields.Everything()) @@ -378,25 +365,44 @@ func (c *ObjectStorageController) runController(ctx context.Context) { panic(err) } - c.queue.Add(updateOp{ + if reflect.DeepEqual(d.Object, old) { + return nil + } + + uuid := d.Object.(metav1.Object).GetUID() + c.OpLock(uuid) + defer c.OpUnlock(uuid) + c.opMap[uuid] = updateOp{ OldObject: old, NewObject: d.Object, UpdateFunc: &update, Key: key, - }) - return indexer.Update(d.Object) + Indexer: indexer, + } + c.queue.Add(uuid) } else { key, err := cache.MetaNamespaceKeyFunc(d.Object) if err != nil { panic(err) } - c.queue.Add(addOp{ + uuid := d.Object.(metav1.Object).GetUID() + c.OpLock(uuid) + defer c.OpUnlock(uuid) + + // If an update to the k8s object happens before add has succeeded + if op, ok := c.opMap[uuid]; ok { + if _, ok := op.(updateOp); ok { + return fmt.Errorf("cannot add already added object: %s", key) + } + } + c.opMap[uuid] = addOp{ Object: d.Object, AddFunc: &add, Key: key, - }) - return indexer.Add(d.Object) + Indexer: indexer, + } + c.queue.Add(uuid) } case cache.Deleted: key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(d.Object) @@ -404,12 +410,16 @@ func (c *ObjectStorageController) runController(ctx context.Context) { panic(err) } - c.queue.Add(deleteOp{ + uuid := d.Object.(metav1.Object).GetUID() + c.OpLock(uuid) + defer c.OpUnlock(uuid) + c.opMap[uuid] = deleteOp{ Object: d.Object, DeleteFunc: &delete, Key: key, - }) - return indexer.Delete(d.Object) + Indexer: indexer, + } + c.queue.Add(uuid) } } return nil @@ -429,7 +439,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { } for i := 0; i < c.threadiness; i++ { - go wait.UntilWithContext(ctx, c.runWorker, time.Second) + go c.runWorker(ctx) } <-ctx.Done()