Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

fix controller logic to prevent concurrent map reads and writes #28

Merged
merged 1 commit into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 24 additions & 29 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type ObjectStorageController struct {

lockerLock sync.Mutex
locker map[types.UID]*sync.Mutex
opMap map[types.UID]interface{}
opMap *sync.Map
}

func NewDefaultObjectStorageController(identity string, leaderLockName string, threads int) (*ObjectStorageController, error) {
Expand Down Expand Up @@ -162,7 +162,7 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,

opMap: map[types.UID]interface{}{},
opMap: &sync.Map{},
}, nil
}

Expand Down Expand Up @@ -253,18 +253,19 @@ func (c *ObjectStorageController) processNextItem(ctx context.Context) bool {

uuid := uuidInterface.(types.UID)
var err error
// With the lock below in place, we can safely tell the queue that we are done
// processing this item. The lock will ensure that multiple items of the same
// name and kind do not get processed simultaneously

defer c.queue.Done(uuid)

op, ok := c.opMap.Load(uuid)
if !ok {
panic("unreachable code")
}

// Ensure that multiple operations on different versions of the same object
// do not happen in parallel
c.OpLock(uuid)
defer c.OpUnlock(uuid)

op := c.opMap[uuid]

switch o := op.(type) {
case addOp:
add := *o.AddFunc
Expand Down Expand Up @@ -332,12 +333,12 @@ func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex {
}

// handleErr checks if an error happened and makes sure we will retry later.
func (c *ObjectStorageController) handleErr(err error, op interface{}) {
func (c *ObjectStorageController) handleErr(err error, uuid types.UID) {
if err == nil {
c.queue.Forget(op)
c.opMap.Delete(uuid)
return
}
c.queue.AddRateLimited(op)
c.queue.AddRateLimited(uuid)
}

func (c *ObjectStorageController) runController(ctx context.Context) {
Expand All @@ -349,7 +350,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
cfg := &cache.Config{
Queue: cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
KnownObjects: indexer,
EmitDeltaTypeReplaced: true,
EmitDeltaTypeReplaced: false,
}),
ListerWatcher: lw,
ObjectType: objType,
Expand All @@ -370,15 +371,14 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
}

uuid := d.Object.(metav1.Object).GetUID()
c.OpLock(uuid)
defer c.OpUnlock(uuid)
c.opMap[uuid] = updateOp{

c.opMap.Store(uuid, updateOp{
OldObject: old,
NewObject: d.Object,
UpdateFunc: &update,
Key: key,
Indexer: indexer,
}
})
c.queue.Add(uuid)
} else {
key, err := cache.MetaNamespaceKeyFunc(d.Object)
Expand All @@ -387,20 +387,17 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
}

uuid := d.Object.(metav1.Object).GetUID()
c.OpLock(uuid)
defer c.OpUnlock(uuid)

// If an update to the k8s object happens before add has succeeded
if op, ok := c.opMap[uuid]; ok {
if _, ok := op.(updateOp); ok {
return fmt.Errorf("cannot add already added object: %s", key)
}
}
c.opMap[uuid] = addOp{

if op, ok := c.opMap.LoadOrStore(uuid, addOp{
Object: d.Object,
AddFunc: &add,
Key: key,
Indexer: indexer,
}); ok { // If an update to the k8s object happens before add has succeeded
if _, ok := op.(updateOp); ok {
err := fmt.Errorf("cannot add already added object: %s", key)
return err
}
}
c.queue.Add(uuid)
}
Expand All @@ -411,14 +408,12 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
}

uuid := d.Object.(metav1.Object).GetUID()
c.OpLock(uuid)
defer c.OpUnlock(uuid)
c.opMap[uuid] = deleteOp{
c.opMap.Store(uuid, deleteOp{
Object: d.Object,
DeleteFunc: &delete,
Key: key,
Indexer: indexer,
}
})
c.queue.Add(uuid)
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/go-openapi/spec v0.19.12
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/onsi/ginkgo v1.14.2 // indirect
github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
k8s.io/api v0.19.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg=
github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down Expand Up @@ -325,6 +326,7 @@ github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTd
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
Expand Down