diff --git a/controller/controller.go b/controller/controller.go index daa24a33..19b86e3f 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -103,7 +103,7 @@ type ObjectStorageController struct { lockerLock sync.Mutex locker map[types.UID]*sync.Mutex - opMap map[types.UID]interface{} + opMap *sync.Map } func NewDefaultObjectStorageController(identity string, leaderLockName string, threads int) (*ObjectStorageController, error) { @@ -162,7 +162,7 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str RenewDeadline: 15 * time.Second, RetryPeriod: 5 * time.Second, - opMap: map[types.UID]interface{}{}, + opMap: &sync.Map{}, }, nil } @@ -253,18 +253,19 @@ func (c *ObjectStorageController) processNextItem(ctx context.Context) bool { uuid := uuidInterface.(types.UID) var err error - // With the lock below in place, we can safely tell the queue that we are done - // processing this item. The lock will ensure that multiple items of the same - // name and kind do not get processed simultaneously + defer c.queue.Done(uuid) + op, ok := c.opMap.Load(uuid) + if !ok { + panic("unreachable code") + } + // Ensure that multiple operations on different versions of the same object // do not happen in parallel c.OpLock(uuid) defer c.OpUnlock(uuid) - op := c.opMap[uuid] - switch o := op.(type) { case addOp: add := *o.AddFunc @@ -332,12 +333,12 @@ func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex { } // handleErr checks if an error happened and makes sure we will retry later. -func (c *ObjectStorageController) handleErr(err error, op interface{}) { +func (c *ObjectStorageController) handleErr(err error, uuid types.UID) { if err == nil { - c.queue.Forget(op) + c.opMap.Delete(uuid) return } - c.queue.AddRateLimited(op) + c.queue.AddRateLimited(uuid) } func (c *ObjectStorageController) runController(ctx context.Context) { @@ -349,7 +350,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { cfg := &cache.Config{ Queue: cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{ KnownObjects: indexer, - EmitDeltaTypeReplaced: true, + EmitDeltaTypeReplaced: false, }), ListerWatcher: lw, ObjectType: objType, @@ -370,15 +371,14 @@ func (c *ObjectStorageController) runController(ctx context.Context) { } uuid := d.Object.(metav1.Object).GetUID() - c.OpLock(uuid) - defer c.OpUnlock(uuid) - c.opMap[uuid] = updateOp{ + + c.opMap.Store(uuid, updateOp{ OldObject: old, NewObject: d.Object, UpdateFunc: &update, Key: key, Indexer: indexer, - } + }) c.queue.Add(uuid) } else { key, err := cache.MetaNamespaceKeyFunc(d.Object) @@ -387,20 +387,17 @@ func (c *ObjectStorageController) runController(ctx context.Context) { } uuid := d.Object.(metav1.Object).GetUID() - c.OpLock(uuid) - defer c.OpUnlock(uuid) - - // If an update to the k8s object happens before add has succeeded - if op, ok := c.opMap[uuid]; ok { - if _, ok := op.(updateOp); ok { - return fmt.Errorf("cannot add already added object: %s", key) - } - } - c.opMap[uuid] = addOp{ + + if op, ok := c.opMap.LoadOrStore(uuid, addOp{ Object: d.Object, AddFunc: &add, Key: key, Indexer: indexer, + }); ok { // If an update to the k8s object happens before add has succeeded + if _, ok := op.(updateOp); ok { + err := fmt.Errorf("cannot add already added object: %s", key) + return err + } } c.queue.Add(uuid) } @@ -411,14 +408,12 @@ func (c *ObjectStorageController) runController(ctx context.Context) { } uuid := d.Object.(metav1.Object).GetUID() - c.OpLock(uuid) - defer c.OpUnlock(uuid) - c.opMap[uuid] = deleteOp{ + c.opMap.Store(uuid, deleteOp{ Object: d.Object, DeleteFunc: &delete, Key: key, Indexer: indexer, - } + }) c.queue.Add(uuid) } } diff --git a/go.mod b/go.mod index f165a7e5..2ce6babd 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/go-openapi/spec v0.19.12 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/onsi/ginkgo v1.14.2 // indirect + github.com/spf13/cobra v0.0.5 github.com/spf13/viper v1.3.2 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 k8s.io/api v0.19.4 diff --git a/go.sum b/go.sum index 71429d22..b0449b6a 100644 --- a/go.sum +++ b/go.sum @@ -228,6 +228,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg= github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -325,6 +326,7 @@ github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTd github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=