-
Notifications
You must be signed in to change notification settings - Fork 29
Fix bucket* controller framework #25
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,25 +5,21 @@ import ( | |
"fmt" | ||
"io/ioutil" | ||
"os" | ||
"reflect" | ||
"regexp" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"golang.org/x/time/rate" | ||
|
||
// objectstorage | ||
v1alpha1 "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1" | ||
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/clientset" | ||
|
||
// k8s api | ||
v1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/fields" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/types" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
|
||
// k8s client | ||
kubeclientset "k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1" | ||
|
@@ -35,10 +31,7 @@ import ( | |
"k8s.io/client-go/tools/record" | ||
"k8s.io/client-go/util/workqueue" | ||
|
||
// logging | ||
"github.com/golang/glog" | ||
|
||
// config | ||
"github.com/spf13/viper" | ||
) | ||
|
||
|
@@ -49,6 +42,7 @@ type deleteFunc func(ctx context.Context, obj interface{}) error | |
type addOp struct { | ||
Object interface{} | ||
AddFunc *addFunc | ||
Indexer cache.Indexer | ||
|
||
Key string | ||
} | ||
|
@@ -61,6 +55,7 @@ type updateOp struct { | |
OldObject interface{} | ||
NewObject interface{} | ||
UpdateFunc *updateFunc | ||
Indexer cache.Indexer | ||
|
||
Key string | ||
} | ||
|
@@ -72,6 +67,7 @@ func (u updateOp) String() string { | |
type deleteOp struct { | ||
Object interface{} | ||
DeleteFunc *deleteFunc | ||
Indexer cache.Indexer | ||
|
||
Key string | ||
} | ||
|
@@ -105,15 +101,13 @@ type ObjectStorageController struct { | |
bucketClient bucketclientset.Interface | ||
kubeClient kubeclientset.Interface | ||
|
||
locker map[string]*sync.Mutex | ||
lockerLock sync.Mutex | ||
locker map[types.UID]*sync.Mutex | ||
opMap map[types.UID]interface{} | ||
} | ||
|
||
func NewDefaultObjectStorageController(identity string, leaderLockName string, threads int) (*ObjectStorageController, error) { | ||
rateLimit := workqueue.NewMaxOfRateLimiter( | ||
workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 600*time.Second), | ||
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, | ||
) | ||
rateLimit := workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 30*time.Second) | ||
return NewObjectStorageController(identity, leaderLockName, threads, rateLimit) | ||
} | ||
|
||
|
@@ -162,10 +156,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str | |
queue: workqueue.NewRateLimitingQueue(limiter), | ||
threadiness: threads, | ||
|
||
ResyncPeriod: 30 * time.Second, | ||
LeaseDuration: 15 * time.Second, | ||
RenewDeadline: 10 * time.Second, | ||
ResyncPeriod: 30 * time.Second, | ||
// leader election | ||
LeaseDuration: 60 * time.Second, | ||
RenewDeadline: 15 * time.Second, | ||
RetryPeriod: 5 * time.Second, | ||
|
||
opMap: map[types.UID]interface{}{}, | ||
}, nil | ||
} | ||
|
||
|
@@ -219,10 +216,11 @@ func (c *ObjectStorageController) Run(ctx context.Context) error { | |
} | ||
|
||
leaderConfig := leaderelection.LeaderElectionConfig{ | ||
Lock: l, | ||
LeaseDuration: c.LeaseDuration, | ||
RenewDeadline: c.RenewDeadline, | ||
RetryPeriod: c.RetryPeriod, | ||
Lock: l, | ||
ReleaseOnCancel: true, | ||
LeaseDuration: c.LeaseDuration, | ||
RenewDeadline: c.RenewDeadline, | ||
RetryPeriod: c.RetryPeriod, | ||
Callbacks: leaderelection.LeaderCallbacks{ | ||
OnStartedLeading: func(ctx context.Context) { | ||
glog.V(2).Info("became leader, starting") | ||
|
@@ -248,75 +246,85 @@ func (c *ObjectStorageController) runWorker(ctx context.Context) { | |
|
||
func (c *ObjectStorageController) processNextItem(ctx context.Context) bool { | ||
// Wait until there is a new item in the working queue | ||
op, quit := c.queue.Get() | ||
uuidInterface, quit := c.queue.Get() | ||
if quit { | ||
return false | ||
} | ||
|
||
uuid := uuidInterface.(types.UID) | ||
var err error | ||
// With the lock below in place, we can safely tell the queue that we are done | ||
// processing this item. The lock will ensure that multiple items of the same | ||
// name and kind do not get processed simultaneously | ||
defer c.queue.Done(op) | ||
defer c.queue.Done(uuid) | ||
|
||
// Ensure that multiple operations on different versions of the same object | ||
// do not happen in parallel | ||
c.OpLock(op) | ||
defer c.OpUnlock(op) | ||
c.OpLock(uuid) | ||
defer c.OpUnlock(uuid) | ||
|
||
op := c.opMap[uuid] | ||
|
||
var err error | ||
switch o := op.(type) { | ||
case addOp: | ||
add := *o.AddFunc | ||
objMeta := o.Object.(metav1.Object) | ||
name := objMeta.GetName() | ||
ns := objMeta.GetNamespace() | ||
err = add(ctx, o.Object) | ||
if err == nil { | ||
o.Indexer.Add(o.Object) | ||
} else { | ||
glog.Errorf("Error adding %s %s: %v", ns, name, err) | ||
} | ||
case updateOp: | ||
update := *o.UpdateFunc | ||
objMeta := o.OldObject.(metav1.Object) | ||
name := objMeta.GetName() | ||
ns := objMeta.GetNamespace() | ||
err = update(ctx, o.OldObject, o.NewObject) | ||
if err == nil { | ||
o.Indexer.Update(o.NewObject) | ||
} else { | ||
glog.Errorf("Error updating %s %s: %v", ns, name, err) | ||
} | ||
case deleteOp: | ||
delete := *o.DeleteFunc | ||
objMeta := o.Object.(metav1.Object) | ||
name := objMeta.GetName() | ||
ns := objMeta.GetNamespace() | ||
err = delete(ctx, o.Object) | ||
if err == nil { | ||
o.Indexer.Delete(o.Object) | ||
} else { | ||
glog.Errorf("Error deleting %s %s: %v", ns, name, err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure |
||
default: | ||
panic("unknown item in queue") | ||
} | ||
|
||
// Handle the error if something went wrong | ||
c.handleErr(err, op) | ||
c.handleErr(err, uuid) | ||
return true | ||
} | ||
|
||
func (c *ObjectStorageController) OpLock(op interface{}) { | ||
func (c *ObjectStorageController) OpLock(op types.UID) { | ||
c.GetOpLock(op).Lock() | ||
} | ||
|
||
func (c *ObjectStorageController) OpUnlock(op interface{}) { | ||
func (c *ObjectStorageController) OpUnlock(op types.UID) { | ||
c.GetOpLock(op).Unlock() | ||
} | ||
|
||
func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex { | ||
var key string | ||
var ext string | ||
|
||
switch o := op.(type) { | ||
case addOp: | ||
key = o.Key | ||
ext = fmt.Sprintf("%v", o.AddFunc) | ||
case updateOp: | ||
key = o.Key | ||
ext = fmt.Sprintf("%v", o.UpdateFunc) | ||
case deleteOp: | ||
key = o.Key | ||
ext = fmt.Sprintf("%v", o.DeleteFunc) | ||
default: | ||
panic("unknown item in queue") | ||
} | ||
func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex { | ||
lockKey := op | ||
c.lockerLock.Lock() | ||
defer c.lockerLock.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even though the docs say that deferred funcs can be called anytime after the defer statement itself, my understanding is that it always gets called only after returning from the function it is called from. i.e. only after the defer starts going out of scope. |
||
|
||
lockKey := fmt.Sprintf("%s/%s", key, ext) | ||
if c.locker == nil { | ||
c.locker = map[string]*sync.Mutex{} | ||
c.locker = map[types.UID]*sync.Mutex{} | ||
} | ||
|
||
c.lockerLock.Lock() | ||
defer c.lockerLock.Unlock() | ||
|
||
if _, ok := c.locker[lockKey]; !ok { | ||
c.locker[lockKey] = &sync.Mutex{} | ||
} | ||
|
@@ -326,36 +334,15 @@ func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex { | |
// handleErr checks if an error happened and makes sure we will retry later. | ||
func (c *ObjectStorageController) handleErr(err error, op interface{}) { | ||
if err == nil { | ||
// Forget about the #AddRateLimited history of the op on every successful synchronization. | ||
// This ensures that future processing of updates for this op is not delayed because of | ||
// an outdated error history. | ||
c.queue.Forget(op) | ||
return | ||
} | ||
|
||
/* TODO: Determine if there is a maxium number of retries or time allowed before giving up | ||
// This controller retries 5 times if something goes wrong. After that, it stops trying. | ||
if c.queue.NumRequeues(op) < 5 { | ||
klog.Infof("Error syncing op %v: %v", key, err) | ||
|
||
// Re-enqueue the key rate limited. Based on the rate limiter on the | ||
// queue and the re-enqueue history, the op will be processed later again. | ||
c.queue.AddRateLimited(op) | ||
return | ||
} | ||
|
||
c.queue.Forget(key) | ||
// Report to an external entity that, even after several retries, we could not successfully process this op | ||
utilruntime.HandleError(err) | ||
klog.Infof("Dropping op %+v out of the queue: %v", op, err) | ||
*/ | ||
glog.V(5).Infof("Error executing operation %+v: %+v", op, err) | ||
c.queue.AddRateLimited(op) | ||
} | ||
|
||
func (c *ObjectStorageController) runController(ctx context.Context) { | ||
controllerFor := func(name string, objType runtime.Object, add addFunc, update updateFunc, delete deleteFunc) { | ||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) | ||
indexer := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) | ||
resyncPeriod := c.ResyncPeriod | ||
|
||
lw := cache.NewListWatchFromClient(c.bucketClient.ObjectstorageV1alpha1().RESTClient(), name, "", fields.Everything()) | ||
|
@@ -378,38 +365,61 @@ func (c *ObjectStorageController) runController(ctx context.Context) { | |
panic(err) | ||
} | ||
|
||
c.queue.Add(updateOp{ | ||
if reflect.DeepEqual(d.Object, old) { | ||
return nil | ||
} | ||
|
||
uuid := d.Object.(metav1.Object).GetUID() | ||
c.OpLock(uuid) | ||
defer c.OpUnlock(uuid) | ||
c.opMap[uuid] = updateOp{ | ||
OldObject: old, | ||
NewObject: d.Object, | ||
UpdateFunc: &update, | ||
Key: key, | ||
}) | ||
return indexer.Update(d.Object) | ||
Indexer: indexer, | ||
} | ||
c.queue.Add(uuid) | ||
} else { | ||
key, err := cache.MetaNamespaceKeyFunc(d.Object) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
c.queue.Add(addOp{ | ||
uuid := d.Object.(metav1.Object).GetUID() | ||
c.OpLock(uuid) | ||
defer c.OpUnlock(uuid) | ||
|
||
// If an update to the k8s object happens before add has succeeded | ||
if op, ok := c.opMap[uuid]; ok { | ||
if _, ok := op.(updateOp); ok { | ||
return fmt.Errorf("cannot add already added object: %s", key) | ||
} | ||
} | ||
c.opMap[uuid] = addOp{ | ||
Object: d.Object, | ||
AddFunc: &add, | ||
Key: key, | ||
}) | ||
return indexer.Add(d.Object) | ||
Indexer: indexer, | ||
} | ||
c.queue.Add(uuid) | ||
} | ||
case cache.Deleted: | ||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(d.Object) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
c.queue.Add(deleteOp{ | ||
uuid := d.Object.(metav1.Object).GetUID() | ||
c.OpLock(uuid) | ||
defer c.OpUnlock(uuid) | ||
c.opMap[uuid] = deleteOp{ | ||
Object: d.Object, | ||
DeleteFunc: &delete, | ||
Key: key, | ||
}) | ||
return indexer.Delete(d.Object) | ||
Indexer: indexer, | ||
} | ||
c.queue.Add(uuid) | ||
} | ||
} | ||
return nil | ||
|
@@ -429,7 +439,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) { | |
} | ||
|
||
for i := 0; i < c.threadiness; i++ { | ||
go wait.UntilWithContext(ctx, c.runWorker, time.Second) | ||
go c.runWorker(ctx) | ||
} | ||
|
||
<-ctx.Done() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I am missing something here. How does tracking uuid prevent operating in parallel on different versions of the same object? Wouldn't this serialize on all versions of the object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our controller, we run a single shared informer between multiple crd resource event handlers. The single shared informer reports changes in bucket* objects as they occur. The events are processed by first encoding them in operation structs, and putting them into the tail of a workqueue. The workqueue implementation is what this PR mostly touches on.
The workqueue was previously holding operation objects (addOp, updateOp, deleteOp). On any event, these operation objects were put into the queue, and waited to be processed by the next available thread.
These objects were not comparable with one another, and that lead to a few problems:
The previous implementation, albeit technically eventually consistent, could not ensure the above dynamics. This was simply because it was not possible to determine new events on an object already present in the queue.
Now, for the
uuid
. This is a string field that is easily comparable (==
), and by using the kubernetes UID as the unique value in this field, it is possible to compare two events occurring on the same object.This change also allowed me to improve the exponential backoff characteristics. If new events occur while an operation on the same object is in back-off, the new event will not be processed before the back-off period is complete. i.e. the uuid positionin the queue is not changed. Instead, the value of
opMap[uuid]
is set to the new operation structure denoting the event. By decoupling the operation itself from the queuing order, we gain a lot of flexibility.Now to answer your direct question,
By using uuid, we do not serialize the steps. We either replace old events with new ones if both are updates or handle them differently based on the type of the events.