Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Commit 05acb5e

Browse files
authored
Merge pull request #25 from wlan0/master
Fix bucket* controller framework
2 parents eb167f7 + 64fea45 commit 05acb5e

File tree

1 file changed

+94
-84
lines changed

1 file changed

+94
-84
lines changed

Diff for: controller/controller.go

+94-84
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,21 @@ import (
55
"fmt"
66
"io/ioutil"
77
"os"
8+
"reflect"
89
"regexp"
910
"strings"
1011
"sync"
1112
"time"
1213

13-
"golang.org/x/time/rate"
14-
15-
// objectstorage
1614
v1alpha1 "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1"
1715
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/clientset"
1816

19-
// k8s api
2017
v1 "k8s.io/api/core/v1"
18+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2119
"k8s.io/apimachinery/pkg/fields"
2220
"k8s.io/apimachinery/pkg/runtime"
21+
"k8s.io/apimachinery/pkg/types"
2322
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
24-
"k8s.io/apimachinery/pkg/util/wait"
25-
26-
// k8s client
2723
kubeclientset "k8s.io/client-go/kubernetes"
2824
"k8s.io/client-go/kubernetes/scheme"
2925
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -35,10 +31,7 @@ import (
3531
"k8s.io/client-go/tools/record"
3632
"k8s.io/client-go/util/workqueue"
3733

38-
// logging
3934
"github.com/golang/glog"
40-
41-
// config
4235
"github.com/spf13/viper"
4336
)
4437

@@ -49,6 +42,7 @@ type deleteFunc func(ctx context.Context, obj interface{}) error
4942
type addOp struct {
5043
Object interface{}
5144
AddFunc *addFunc
45+
Indexer cache.Indexer
5246

5347
Key string
5448
}
@@ -61,6 +55,7 @@ type updateOp struct {
6155
OldObject interface{}
6256
NewObject interface{}
6357
UpdateFunc *updateFunc
58+
Indexer cache.Indexer
6459

6560
Key string
6661
}
@@ -72,6 +67,7 @@ func (u updateOp) String() string {
7267
type deleteOp struct {
7368
Object interface{}
7469
DeleteFunc *deleteFunc
70+
Indexer cache.Indexer
7571

7672
Key string
7773
}
@@ -105,15 +101,13 @@ type ObjectStorageController struct {
105101
bucketClient bucketclientset.Interface
106102
kubeClient kubeclientset.Interface
107103

108-
locker map[string]*sync.Mutex
109104
lockerLock sync.Mutex
105+
locker map[types.UID]*sync.Mutex
106+
opMap map[types.UID]interface{}
110107
}
111108

112109
func NewDefaultObjectStorageController(identity string, leaderLockName string, threads int) (*ObjectStorageController, error) {
113-
rateLimit := workqueue.NewMaxOfRateLimiter(
114-
workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 600*time.Second),
115-
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
116-
)
110+
rateLimit := workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 30*time.Second)
117111
return NewObjectStorageController(identity, leaderLockName, threads, rateLimit)
118112
}
119113

@@ -162,10 +156,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
162156
queue: workqueue.NewRateLimitingQueue(limiter),
163157
threadiness: threads,
164158

165-
ResyncPeriod: 30 * time.Second,
166-
LeaseDuration: 15 * time.Second,
167-
RenewDeadline: 10 * time.Second,
159+
ResyncPeriod: 30 * time.Second,
160+
// leader election
161+
LeaseDuration: 60 * time.Second,
162+
RenewDeadline: 15 * time.Second,
168163
RetryPeriod: 5 * time.Second,
164+
165+
opMap: map[types.UID]interface{}{},
169166
}, nil
170167
}
171168

@@ -219,10 +216,11 @@ func (c *ObjectStorageController) Run(ctx context.Context) error {
219216
}
220217

221218
leaderConfig := leaderelection.LeaderElectionConfig{
222-
Lock: l,
223-
LeaseDuration: c.LeaseDuration,
224-
RenewDeadline: c.RenewDeadline,
225-
RetryPeriod: c.RetryPeriod,
219+
Lock: l,
220+
ReleaseOnCancel: true,
221+
LeaseDuration: c.LeaseDuration,
222+
RenewDeadline: c.RenewDeadline,
223+
RetryPeriod: c.RetryPeriod,
226224
Callbacks: leaderelection.LeaderCallbacks{
227225
OnStartedLeading: func(ctx context.Context) {
228226
glog.V(2).Info("became leader, starting")
@@ -248,75 +246,85 @@ func (c *ObjectStorageController) runWorker(ctx context.Context) {
248246

249247
func (c *ObjectStorageController) processNextItem(ctx context.Context) bool {
250248
// Wait until there is a new item in the working queue
251-
op, quit := c.queue.Get()
249+
uuidInterface, quit := c.queue.Get()
252250
if quit {
253251
return false
254252
}
255253

254+
uuid := uuidInterface.(types.UID)
255+
var err error
256256
// With the lock below in place, we can safely tell the queue that we are done
257257
// processing this item. The lock will ensure that multiple items of the same
258258
// name and kind do not get processed simultaneously
259-
defer c.queue.Done(op)
259+
defer c.queue.Done(uuid)
260260

261261
// Ensure that multiple operations on different versions of the same object
262262
// do not happen in parallel
263-
c.OpLock(op)
264-
defer c.OpUnlock(op)
263+
c.OpLock(uuid)
264+
defer c.OpUnlock(uuid)
265+
266+
op := c.opMap[uuid]
265267

266-
var err error
267268
switch o := op.(type) {
268269
case addOp:
269270
add := *o.AddFunc
271+
objMeta := o.Object.(metav1.Object)
272+
name := objMeta.GetName()
273+
ns := objMeta.GetNamespace()
270274
err = add(ctx, o.Object)
275+
if err == nil {
276+
o.Indexer.Add(o.Object)
277+
} else {
278+
glog.Errorf("Error adding %s %s: %v", ns, name, err)
279+
}
271280
case updateOp:
272281
update := *o.UpdateFunc
282+
objMeta := o.OldObject.(metav1.Object)
283+
name := objMeta.GetName()
284+
ns := objMeta.GetNamespace()
273285
err = update(ctx, o.OldObject, o.NewObject)
286+
if err == nil {
287+
o.Indexer.Update(o.NewObject)
288+
} else {
289+
glog.Errorf("Error updating %s %s: %v", ns, name, err)
290+
}
274291
case deleteOp:
275292
delete := *o.DeleteFunc
293+
objMeta := o.Object.(metav1.Object)
294+
name := objMeta.GetName()
295+
ns := objMeta.GetNamespace()
276296
err = delete(ctx, o.Object)
297+
if err == nil {
298+
o.Indexer.Delete(o.Object)
299+
} else {
300+
glog.Errorf("Error deleting %s %s: %v", ns, name, err)
301+
}
277302
default:
278303
panic("unknown item in queue")
279304
}
280305

281306
// Handle the error if something went wrong
282-
c.handleErr(err, op)
307+
c.handleErr(err, uuid)
283308
return true
284309
}
285310

286-
func (c *ObjectStorageController) OpLock(op interface{}) {
311+
func (c *ObjectStorageController) OpLock(op types.UID) {
287312
c.GetOpLock(op).Lock()
288313
}
289314

290-
func (c *ObjectStorageController) OpUnlock(op interface{}) {
315+
func (c *ObjectStorageController) OpUnlock(op types.UID) {
291316
c.GetOpLock(op).Unlock()
292317
}
293318

294-
func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex {
295-
var key string
296-
var ext string
297-
298-
switch o := op.(type) {
299-
case addOp:
300-
key = o.Key
301-
ext = fmt.Sprintf("%v", o.AddFunc)
302-
case updateOp:
303-
key = o.Key
304-
ext = fmt.Sprintf("%v", o.UpdateFunc)
305-
case deleteOp:
306-
key = o.Key
307-
ext = fmt.Sprintf("%v", o.DeleteFunc)
308-
default:
309-
panic("unknown item in queue")
310-
}
319+
func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex {
320+
lockKey := op
321+
c.lockerLock.Lock()
322+
defer c.lockerLock.Unlock()
311323

312-
lockKey := fmt.Sprintf("%s/%s", key, ext)
313324
if c.locker == nil {
314-
c.locker = map[string]*sync.Mutex{}
325+
c.locker = map[types.UID]*sync.Mutex{}
315326
}
316327

317-
c.lockerLock.Lock()
318-
defer c.lockerLock.Unlock()
319-
320328
if _, ok := c.locker[lockKey]; !ok {
321329
c.locker[lockKey] = &sync.Mutex{}
322330
}
@@ -326,36 +334,15 @@ func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex {
326334
// handleErr checks if an error happened and makes sure we will retry later.
327335
func (c *ObjectStorageController) handleErr(err error, op interface{}) {
328336
if err == nil {
329-
// Forget about the #AddRateLimited history of the op on every successful synchronization.
330-
// This ensures that future processing of updates for this op is not delayed because of
331-
// an outdated error history.
332337
c.queue.Forget(op)
333338
return
334339
}
335-
336-
/* TODO: Determine if there is a maxium number of retries or time allowed before giving up
337-
// This controller retries 5 times if something goes wrong. After that, it stops trying.
338-
if c.queue.NumRequeues(op) < 5 {
339-
klog.Infof("Error syncing op %v: %v", key, err)
340-
341-
// Re-enqueue the key rate limited. Based on the rate limiter on the
342-
// queue and the re-enqueue history, the op will be processed later again.
343-
c.queue.AddRateLimited(op)
344-
return
345-
}
346-
347-
c.queue.Forget(key)
348-
// Report to an external entity that, even after several retries, we could not successfully process this op
349-
utilruntime.HandleError(err)
350-
klog.Infof("Dropping op %+v out of the queue: %v", op, err)
351-
*/
352-
glog.V(5).Infof("Error executing operation %+v: %+v", op, err)
353340
c.queue.AddRateLimited(op)
354341
}
355342

356343
func (c *ObjectStorageController) runController(ctx context.Context) {
357344
controllerFor := func(name string, objType runtime.Object, add addFunc, update updateFunc, delete deleteFunc) {
358-
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
345+
indexer := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{})
359346
resyncPeriod := c.ResyncPeriod
360347

361348
lw := cache.NewListWatchFromClient(c.bucketClient.ObjectstorageV1alpha1().RESTClient(), name, "", fields.Everything())
@@ -378,38 +365,61 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
378365
panic(err)
379366
}
380367

381-
c.queue.Add(updateOp{
368+
if reflect.DeepEqual(d.Object, old) {
369+
return nil
370+
}
371+
372+
uuid := d.Object.(metav1.Object).GetUID()
373+
c.OpLock(uuid)
374+
defer c.OpUnlock(uuid)
375+
c.opMap[uuid] = updateOp{
382376
OldObject: old,
383377
NewObject: d.Object,
384378
UpdateFunc: &update,
385379
Key: key,
386-
})
387-
return indexer.Update(d.Object)
380+
Indexer: indexer,
381+
}
382+
c.queue.Add(uuid)
388383
} else {
389384
key, err := cache.MetaNamespaceKeyFunc(d.Object)
390385
if err != nil {
391386
panic(err)
392387
}
393388

394-
c.queue.Add(addOp{
389+
uuid := d.Object.(metav1.Object).GetUID()
390+
c.OpLock(uuid)
391+
defer c.OpUnlock(uuid)
392+
393+
// If an update to the k8s object happens before add has succeeded
394+
if op, ok := c.opMap[uuid]; ok {
395+
if _, ok := op.(updateOp); ok {
396+
return fmt.Errorf("cannot add already added object: %s", key)
397+
}
398+
}
399+
c.opMap[uuid] = addOp{
395400
Object: d.Object,
396401
AddFunc: &add,
397402
Key: key,
398-
})
399-
return indexer.Add(d.Object)
403+
Indexer: indexer,
404+
}
405+
c.queue.Add(uuid)
400406
}
401407
case cache.Deleted:
402408
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(d.Object)
403409
if err != nil {
404410
panic(err)
405411
}
406412

407-
c.queue.Add(deleteOp{
413+
uuid := d.Object.(metav1.Object).GetUID()
414+
c.OpLock(uuid)
415+
defer c.OpUnlock(uuid)
416+
c.opMap[uuid] = deleteOp{
408417
Object: d.Object,
409418
DeleteFunc: &delete,
410419
Key: key,
411-
})
412-
return indexer.Delete(d.Object)
420+
Indexer: indexer,
421+
}
422+
c.queue.Add(uuid)
413423
}
414424
}
415425
return nil
@@ -429,7 +439,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
429439
}
430440

431441
for i := 0; i < c.threadiness; i++ {
432-
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
442+
go c.runWorker(ctx)
433443
}
434444

435445
<-ctx.Done()

0 commit comments

Comments
 (0)