Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Fix bucket* controller framework #25

Merged
merged 1 commit into from
Mar 10, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 94 additions & 84 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,21 @@ import (
"fmt"
"io/ioutil"
"os"
"reflect"
"regexp"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

// objectstorage
v1alpha1 "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/clientset"

// k8s api
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"

// k8s client
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -35,10 +31,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

// logging
"github.com/golang/glog"

// config
"github.com/spf13/viper"
)

Expand All @@ -49,6 +42,7 @@ type deleteFunc func(ctx context.Context, obj interface{}) error
type addOp struct {
Object interface{}
AddFunc *addFunc
Indexer cache.Indexer

Key string
}
Expand All @@ -61,6 +55,7 @@ type updateOp struct {
OldObject interface{}
NewObject interface{}
UpdateFunc *updateFunc
Indexer cache.Indexer

Key string
}
Expand All @@ -72,6 +67,7 @@ func (u updateOp) String() string {
type deleteOp struct {
Object interface{}
DeleteFunc *deleteFunc
Indexer cache.Indexer

Key string
}
Expand Down Expand Up @@ -105,15 +101,13 @@ type ObjectStorageController struct {
bucketClient bucketclientset.Interface
kubeClient kubeclientset.Interface

locker map[string]*sync.Mutex
lockerLock sync.Mutex
locker map[types.UID]*sync.Mutex
opMap map[types.UID]interface{}
}

func NewDefaultObjectStorageController(identity string, leaderLockName string, threads int) (*ObjectStorageController, error) {
rateLimit := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 600*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
rateLimit := workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 30*time.Second)
return NewObjectStorageController(identity, leaderLockName, threads, rateLimit)
}

Expand Down Expand Up @@ -162,10 +156,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
queue: workqueue.NewRateLimitingQueue(limiter),
threadiness: threads,

ResyncPeriod: 30 * time.Second,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
ResyncPeriod: 30 * time.Second,
// leader election
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,

opMap: map[types.UID]interface{}{},
}, nil
}

Expand Down Expand Up @@ -219,10 +216,11 @@ func (c *ObjectStorageController) Run(ctx context.Context) error {
}

leaderConfig := leaderelection.LeaderElectionConfig{
Lock: l,
LeaseDuration: c.LeaseDuration,
RenewDeadline: c.RenewDeadline,
RetryPeriod: c.RetryPeriod,
Lock: l,
ReleaseOnCancel: true,
LeaseDuration: c.LeaseDuration,
RenewDeadline: c.RenewDeadline,
RetryPeriod: c.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
glog.V(2).Info("became leader, starting")
Expand All @@ -248,75 +246,85 @@ func (c *ObjectStorageController) runWorker(ctx context.Context) {

func (c *ObjectStorageController) processNextItem(ctx context.Context) bool {
// Wait until there is a new item in the working queue
op, quit := c.queue.Get()
uuidInterface, quit := c.queue.Get()
if quit {
return false
}

uuid := uuidInterface.(types.UID)
var err error
// With the lock below in place, we can safely tell the queue that we are done
// processing this item. The lock will ensure that multiple items of the same
// name and kind do not get processed simultaneously
defer c.queue.Done(op)
defer c.queue.Done(uuid)

// Ensure that multiple operations on different versions of the same object
// do not happen in parallel
c.OpLock(op)
defer c.OpUnlock(op)
c.OpLock(uuid)
Copy link

@jeffvance jeffvance Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am missing something here. How does tracking uuid prevent operating in parallel on different versions of the same object? Wouldn't this serialize on all versions of the object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our controller, we run a single shared informer between multiple crd resource event handlers. The single shared informer reports changes in bucket* objects as they occur. The events are processed by first encoding them in operation structs, and putting them into the tail of a workqueue. The workqueue implementation is what this PR mostly touches on.

The workqueue was previously holding operation objects (addOp, updateOp, deleteOp). On any event, these operation objects were put into the queue, and waited to be processed by the next available thread.

These objects were not comparable with one another, and that lead to a few problems:

  • Gven an object, if update occurs while add is waiting in the queue, it should only result in 1 event - add, with the updated fields
  • Gven an object, if update occurs while another update is waiting in the queue, then the second update is the only one that should be processed

The previous implementation, albeit technically eventually consistent, could not ensure the above dynamics. This was simply because it was not possible to determine new events on an object already present in the queue.

Now, for the uuid. This is a string field that is easily comparable (==), and by using the kubernetes UID as the unique value in this field, it is possible to compare two events occurring on the same object.

This change also allowed me to improve the exponential backoff characteristics. If new events occur while an operation on the same object is in back-off, the new event will not be processed before the back-off period is complete. i.e. the uuid positionin the queue is not changed. Instead, the value of opMap[uuid] is set to the new operation structure denoting the event. By decoupling the operation itself from the queuing order, we gain a lot of flexibility.

Now to answer your direct question,

Sorry, I am missing something here. How does tracking uuid prevent operating in parallel on different versions of the same object? Wouldn't this serialize on all versions of the object?

By using uuid, we do not serialize the steps. We either replace old events with new ones if both are updates or handle them differently based on the type of the events.

defer c.OpUnlock(uuid)

op := c.opMap[uuid]

var err error
switch o := op.(type) {
case addOp:
add := *o.AddFunc
objMeta := o.Object.(metav1.Object)
name := objMeta.GetName()
ns := objMeta.GetNamespace()
err = add(ctx, o.Object)
if err == nil {
o.Indexer.Add(o.Object)
} else {
glog.Errorf("Error adding %s %s: %v", ns, name, err)
}
case updateOp:
update := *o.UpdateFunc
objMeta := o.OldObject.(metav1.Object)
name := objMeta.GetName()
ns := objMeta.GetNamespace()
err = update(ctx, o.OldObject, o.NewObject)
if err == nil {
o.Indexer.Update(o.NewObject)
} else {
glog.Errorf("Error updating %s %s: %v", ns, name, err)
}
case deleteOp:
delete := *o.DeleteFunc
objMeta := o.Object.(metav1.Object)
name := objMeta.GetName()
ns := objMeta.GetNamespace()
err = delete(ctx, o.Object)
if err == nil {
o.Indexer.Delete(o.Object)
} else {
glog.Errorf("Error deleting %s %s: %v", ns, name, err)
}
Copy link

@jeffvance jeffvance Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in default block can we insert the operator that was not handled, or is that logged earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

default:
panic("unknown item in queue")
}

// Handle the error if something went wrong
c.handleErr(err, op)
c.handleErr(err, uuid)
return true
}

func (c *ObjectStorageController) OpLock(op interface{}) {
func (c *ObjectStorageController) OpLock(op types.UID) {
c.GetOpLock(op).Lock()
}

func (c *ObjectStorageController) OpUnlock(op interface{}) {
func (c *ObjectStorageController) OpUnlock(op types.UID) {
c.GetOpLock(op).Unlock()
}

func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex {
var key string
var ext string

switch o := op.(type) {
case addOp:
key = o.Key
ext = fmt.Sprintf("%v", o.AddFunc)
case updateOp:
key = o.Key
ext = fmt.Sprintf("%v", o.UpdateFunc)
case deleteOp:
key = o.Key
ext = fmt.Sprintf("%v", o.DeleteFunc)
default:
panic("unknown item in queue")
}
func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex {
lockKey := op
c.lockerLock.Lock()
defer c.lockerLock.Unlock()
Copy link

@jeffvance jeffvance Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the defer be after L326 to handle the case of c.locker == nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though the docs say that deferred funcs can be called anytime after the defer statement itself, my understanding is that it always gets called only after returning from the function it is called from. i.e. only after the defer starts going out of scope.


lockKey := fmt.Sprintf("%s/%s", key, ext)
if c.locker == nil {
c.locker = map[string]*sync.Mutex{}
c.locker = map[types.UID]*sync.Mutex{}
}

c.lockerLock.Lock()
defer c.lockerLock.Unlock()

if _, ok := c.locker[lockKey]; !ok {
c.locker[lockKey] = &sync.Mutex{}
}
Expand All @@ -326,36 +334,15 @@ func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex {
// handleErr checks if an error happened and makes sure we will retry later.
func (c *ObjectStorageController) handleErr(err error, op interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the op on every successful synchronization.
// This ensures that future processing of updates for this op is not delayed because of
// an outdated error history.
c.queue.Forget(op)
return
}

/* TODO: Determine if there is a maxium number of retries or time allowed before giving up
// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(op) < 5 {
klog.Infof("Error syncing op %v: %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the op will be processed later again.
c.queue.AddRateLimited(op)
return
}

c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this op
utilruntime.HandleError(err)
klog.Infof("Dropping op %+v out of the queue: %v", op, err)
*/
glog.V(5).Infof("Error executing operation %+v: %+v", op, err)
c.queue.AddRateLimited(op)
}

func (c *ObjectStorageController) runController(ctx context.Context) {
controllerFor := func(name string, objType runtime.Object, add addFunc, update updateFunc, delete deleteFunc) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
indexer := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{})
resyncPeriod := c.ResyncPeriod

lw := cache.NewListWatchFromClient(c.bucketClient.ObjectstorageV1alpha1().RESTClient(), name, "", fields.Everything())
Expand All @@ -378,38 +365,61 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
panic(err)
}

c.queue.Add(updateOp{
if reflect.DeepEqual(d.Object, old) {
return nil
}

uuid := d.Object.(metav1.Object).GetUID()
c.OpLock(uuid)
defer c.OpUnlock(uuid)
c.opMap[uuid] = updateOp{
OldObject: old,
NewObject: d.Object,
UpdateFunc: &update,
Key: key,
})
return indexer.Update(d.Object)
Indexer: indexer,
}
c.queue.Add(uuid)
} else {
key, err := cache.MetaNamespaceKeyFunc(d.Object)
if err != nil {
panic(err)
}

c.queue.Add(addOp{
uuid := d.Object.(metav1.Object).GetUID()
c.OpLock(uuid)
defer c.OpUnlock(uuid)

// If an update to the k8s object happens before add has succeeded
if op, ok := c.opMap[uuid]; ok {
if _, ok := op.(updateOp); ok {
return fmt.Errorf("cannot add already added object: %s", key)
}
}
c.opMap[uuid] = addOp{
Object: d.Object,
AddFunc: &add,
Key: key,
})
return indexer.Add(d.Object)
Indexer: indexer,
}
c.queue.Add(uuid)
}
case cache.Deleted:
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(d.Object)
if err != nil {
panic(err)
}

c.queue.Add(deleteOp{
uuid := d.Object.(metav1.Object).GetUID()
c.OpLock(uuid)
defer c.OpUnlock(uuid)
c.opMap[uuid] = deleteOp{
Object: d.Object,
DeleteFunc: &delete,
Key: key,
})
return indexer.Delete(d.Object)
Indexer: indexer,
}
c.queue.Add(uuid)
}
}
return nil
Expand All @@ -429,7 +439,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
}

for i := 0; i < c.threadiness; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
go c.runWorker(ctx)
}

<-ctx.Done()
Expand Down