diff --git a/controller/controller.go b/controller/controller.go index 0a1e34e2..85220e66 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -158,9 +158,9 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str ResyncPeriod: 30 * time.Second, // leader election - LeaseDuration: 60 * time.Second, - RenewDeadline: 30 * time.Second, - RetryPeriod: 15 * time.Second, + LeaseDuration: 150 * time.Second, + RenewDeadline: 120 * time.Second, + RetryPeriod: 60 * time.Second, opMap: &sync.Map{}, }, nil @@ -258,7 +258,7 @@ func (c *ObjectStorageController) processNextItem(ctx context.Context) bool { op, ok := c.opMap.Load(uuid) if !ok { - panic("unreachable code") + return true } // Ensure that multiple operations on different versions of the same object @@ -270,22 +270,16 @@ func (c *ObjectStorageController) processNextItem(ctx context.Context) bool { case addOp: add := *o.AddFunc err = add(ctx, o.Object) - if err == nil { - o.Indexer.Add(o.Object) - } + o.Indexer.Add(o.Object) case updateOp: update := *o.UpdateFunc err = update(ctx, o.OldObject, o.NewObject) - if err == nil { - o.Indexer.Update(o.NewObject) - } + o.Indexer.Update(o.NewObject) case deleteOp: delete := *o.DeleteFunc err = delete(ctx, o.Object) - if err == nil { - o.Indexer.Delete(o.Object) - c.opMap.Delete(uuid) - } + o.Indexer.Delete(o.Object) + c.opMap.Delete(uuid) default: panic("unknown item in queue") } @@ -321,6 +315,7 @@ func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex { // handleErr checks if an error happened and makes sure we will retry later. func (c *ObjectStorageController) handleErr(err error, uuid types.UID) { if err == nil { + c.opMap.Delete(uuid) return } c.queue.AddRateLimited(uuid)