@@ -5,25 +5,21 @@ import (
5
5
"fmt"
6
6
"io/ioutil"
7
7
"os"
8
+ "reflect"
8
9
"regexp"
9
10
"strings"
10
11
"sync"
11
12
"time"
12
13
13
- "golang.org/x/time/rate"
14
-
15
- // objectstorage
16
14
v1alpha1 "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1"
17
15
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/clientset"
18
16
19
- // k8s api
20
17
v1 "k8s.io/api/core/v1"
18
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21
19
"k8s.io/apimachinery/pkg/fields"
22
20
"k8s.io/apimachinery/pkg/runtime"
21
+ "k8s.io/apimachinery/pkg/types"
23
22
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
24
- "k8s.io/apimachinery/pkg/util/wait"
25
-
26
- // k8s client
27
23
kubeclientset "k8s.io/client-go/kubernetes"
28
24
"k8s.io/client-go/kubernetes/scheme"
29
25
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -35,10 +31,7 @@ import (
35
31
"k8s.io/client-go/tools/record"
36
32
"k8s.io/client-go/util/workqueue"
37
33
38
- // logging
39
34
"github.com/golang/glog"
40
-
41
- // config
42
35
"github.com/spf13/viper"
43
36
)
44
37
@@ -49,6 +42,7 @@ type deleteFunc func(ctx context.Context, obj interface{}) error
49
42
type addOp struct {
50
43
Object interface {}
51
44
AddFunc * addFunc
45
+ Indexer cache.Indexer
52
46
53
47
Key string
54
48
}
@@ -61,6 +55,7 @@ type updateOp struct {
61
55
OldObject interface {}
62
56
NewObject interface {}
63
57
UpdateFunc * updateFunc
58
+ Indexer cache.Indexer
64
59
65
60
Key string
66
61
}
@@ -72,6 +67,7 @@ func (u updateOp) String() string {
72
67
type deleteOp struct {
73
68
Object interface {}
74
69
DeleteFunc * deleteFunc
70
+ Indexer cache.Indexer
75
71
76
72
Key string
77
73
}
@@ -105,15 +101,13 @@ type ObjectStorageController struct {
105
101
bucketClient bucketclientset.Interface
106
102
kubeClient kubeclientset.Interface
107
103
108
- locker map [string ]* sync.Mutex
109
104
lockerLock sync.Mutex
105
+ locker map [types.UID ]* sync.Mutex
106
+ opMap map [types.UID ]interface {}
110
107
}
111
108
112
109
func NewDefaultObjectStorageController (identity string , leaderLockName string , threads int ) (* ObjectStorageController , error ) {
113
- rateLimit := workqueue .NewMaxOfRateLimiter (
114
- workqueue .NewItemExponentialFailureRateLimiter (100 * time .Millisecond , 600 * time .Second ),
115
- & workqueue.BucketRateLimiter {Limiter : rate .NewLimiter (rate .Limit (10 ), 100 )},
116
- )
110
+ rateLimit := workqueue .NewItemExponentialFailureRateLimiter (100 * time .Millisecond , 30 * time .Second )
117
111
return NewObjectStorageController (identity , leaderLockName , threads , rateLimit )
118
112
}
119
113
@@ -162,10 +156,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
162
156
queue : workqueue .NewRateLimitingQueue (limiter ),
163
157
threadiness : threads ,
164
158
165
- ResyncPeriod : 30 * time .Second ,
166
- LeaseDuration : 15 * time .Second ,
167
- RenewDeadline : 10 * time .Second ,
159
+ ResyncPeriod : 30 * time .Second ,
160
+ // leader election
161
+ LeaseDuration : 60 * time .Second ,
162
+ RenewDeadline : 15 * time .Second ,
168
163
RetryPeriod : 5 * time .Second ,
164
+
165
+ opMap : map [types.UID ]interface {}{},
169
166
}, nil
170
167
}
171
168
@@ -219,10 +216,11 @@ func (c *ObjectStorageController) Run(ctx context.Context) error {
219
216
}
220
217
221
218
leaderConfig := leaderelection.LeaderElectionConfig {
222
- Lock : l ,
223
- LeaseDuration : c .LeaseDuration ,
224
- RenewDeadline : c .RenewDeadline ,
225
- RetryPeriod : c .RetryPeriod ,
219
+ Lock : l ,
220
+ ReleaseOnCancel : true ,
221
+ LeaseDuration : c .LeaseDuration ,
222
+ RenewDeadline : c .RenewDeadline ,
223
+ RetryPeriod : c .RetryPeriod ,
226
224
Callbacks : leaderelection.LeaderCallbacks {
227
225
OnStartedLeading : func (ctx context.Context ) {
228
226
glog .V (2 ).Info ("became leader, starting" )
@@ -248,75 +246,85 @@ func (c *ObjectStorageController) runWorker(ctx context.Context) {
248
246
249
247
func (c * ObjectStorageController ) processNextItem (ctx context.Context ) bool {
250
248
// Wait until there is a new item in the working queue
251
- op , quit := c .queue .Get ()
249
+ uuidInterface , quit := c .queue .Get ()
252
250
if quit {
253
251
return false
254
252
}
255
253
254
+ uuid := uuidInterface .(types.UID )
255
+ var err error
256
256
// With the lock below in place, we can safely tell the queue that we are done
257
257
// processing this item. The lock will ensure that multiple items of the same
258
258
// name and kind do not get processed simultaneously
259
- defer c .queue .Done (op )
259
+ defer c .queue .Done (uuid )
260
260
261
261
// Ensure that multiple operations on different versions of the same object
262
262
// do not happen in parallel
263
- c .OpLock (op )
264
- defer c .OpUnlock (op )
263
+ c .OpLock (uuid )
264
+ defer c .OpUnlock (uuid )
265
+
266
+ op := c .opMap [uuid ]
265
267
266
- var err error
267
268
switch o := op .(type ) {
268
269
case addOp :
269
270
add := * o .AddFunc
271
+ objMeta := o .Object .(metav1.Object )
272
+ name := objMeta .GetName ()
273
+ ns := objMeta .GetNamespace ()
270
274
err = add (ctx , o .Object )
275
+ if err == nil {
276
+ o .Indexer .Add (o .Object )
277
+ } else {
278
+ glog .Errorf ("Error adding %s %s: %v" , ns , name , err )
279
+ }
271
280
case updateOp :
272
281
update := * o .UpdateFunc
282
+ objMeta := o .OldObject .(metav1.Object )
283
+ name := objMeta .GetName ()
284
+ ns := objMeta .GetNamespace ()
273
285
err = update (ctx , o .OldObject , o .NewObject )
286
+ if err == nil {
287
+ o .Indexer .Update (o .NewObject )
288
+ } else {
289
+ glog .Errorf ("Error updating %s %s: %v" , ns , name , err )
290
+ }
274
291
case deleteOp :
275
292
delete := * o .DeleteFunc
293
+ objMeta := o .Object .(metav1.Object )
294
+ name := objMeta .GetName ()
295
+ ns := objMeta .GetNamespace ()
276
296
err = delete (ctx , o .Object )
297
+ if err == nil {
298
+ o .Indexer .Delete (o .Object )
299
+ } else {
300
+ glog .Errorf ("Error deleting %s %s: %v" , ns , name , err )
301
+ }
277
302
default :
278
303
panic ("unknown item in queue" )
279
304
}
280
305
281
306
// Handle the error if something went wrong
282
- c .handleErr (err , op )
307
+ c .handleErr (err , uuid )
283
308
return true
284
309
}
285
310
286
- func (c * ObjectStorageController ) OpLock (op interface {} ) {
311
+ func (c * ObjectStorageController ) OpLock (op types. UID ) {
287
312
c .GetOpLock (op ).Lock ()
288
313
}
289
314
290
- func (c * ObjectStorageController ) OpUnlock (op interface {} ) {
315
+ func (c * ObjectStorageController ) OpUnlock (op types. UID ) {
291
316
c .GetOpLock (op ).Unlock ()
292
317
}
293
318
294
- func (c * ObjectStorageController ) GetOpLock (op interface {}) * sync.Mutex {
295
- var key string
296
- var ext string
297
-
298
- switch o := op .(type ) {
299
- case addOp :
300
- key = o .Key
301
- ext = fmt .Sprintf ("%v" , o .AddFunc )
302
- case updateOp :
303
- key = o .Key
304
- ext = fmt .Sprintf ("%v" , o .UpdateFunc )
305
- case deleteOp :
306
- key = o .Key
307
- ext = fmt .Sprintf ("%v" , o .DeleteFunc )
308
- default :
309
- panic ("unknown item in queue" )
310
- }
319
+ func (c * ObjectStorageController ) GetOpLock (op types.UID ) * sync.Mutex {
320
+ lockKey := op
321
+ c .lockerLock .Lock ()
322
+ defer c .lockerLock .Unlock ()
311
323
312
- lockKey := fmt .Sprintf ("%s/%s" , key , ext )
313
324
if c .locker == nil {
314
- c .locker = map [string ]* sync.Mutex {}
325
+ c .locker = map [types. UID ]* sync.Mutex {}
315
326
}
316
327
317
- c .lockerLock .Lock ()
318
- defer c .lockerLock .Unlock ()
319
-
320
328
if _ , ok := c .locker [lockKey ]; ! ok {
321
329
c .locker [lockKey ] = & sync.Mutex {}
322
330
}
@@ -326,36 +334,15 @@ func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex {
326
334
// handleErr checks if an error happened and makes sure we will retry later.
327
335
func (c * ObjectStorageController ) handleErr (err error , op interface {}) {
328
336
if err == nil {
329
- // Forget about the #AddRateLimited history of the op on every successful synchronization.
330
- // This ensures that future processing of updates for this op is not delayed because of
331
- // an outdated error history.
332
337
c .queue .Forget (op )
333
338
return
334
339
}
335
-
336
- /* TODO: Determine if there is a maxium number of retries or time allowed before giving up
337
- // This controller retries 5 times if something goes wrong. After that, it stops trying.
338
- if c.queue.NumRequeues(op) < 5 {
339
- klog.Infof("Error syncing op %v: %v", key, err)
340
-
341
- // Re-enqueue the key rate limited. Based on the rate limiter on the
342
- // queue and the re-enqueue history, the op will be processed later again.
343
- c.queue.AddRateLimited(op)
344
- return
345
- }
346
-
347
- c.queue.Forget(key)
348
- // Report to an external entity that, even after several retries, we could not successfully process this op
349
- utilruntime.HandleError(err)
350
- klog.Infof("Dropping op %+v out of the queue: %v", op, err)
351
- */
352
- glog .V (5 ).Infof ("Error executing operation %+v: %+v" , op , err )
353
340
c .queue .AddRateLimited (op )
354
341
}
355
342
356
343
func (c * ObjectStorageController ) runController (ctx context.Context ) {
357
344
controllerFor := func (name string , objType runtime.Object , add addFunc , update updateFunc , delete deleteFunc ) {
358
- indexer := cache .NewIndexer (cache .MetaNamespaceKeyFunc , cache.Indexers {})
345
+ indexer := cache .NewIndexer (cache .DeletionHandlingMetaNamespaceKeyFunc , cache.Indexers {})
359
346
resyncPeriod := c .ResyncPeriod
360
347
361
348
lw := cache .NewListWatchFromClient (c .bucketClient .ObjectstorageV1alpha1 ().RESTClient (), name , "" , fields .Everything ())
@@ -378,38 +365,61 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
378
365
panic (err )
379
366
}
380
367
381
- c .queue .Add (updateOp {
368
+ if reflect .DeepEqual (d .Object , old ) {
369
+ return nil
370
+ }
371
+
372
+ uuid := d .Object .(metav1.Object ).GetUID ()
373
+ c .OpLock (uuid )
374
+ defer c .OpUnlock (uuid )
375
+ c .opMap [uuid ] = updateOp {
382
376
OldObject : old ,
383
377
NewObject : d .Object ,
384
378
UpdateFunc : & update ,
385
379
Key : key ,
386
- })
387
- return indexer .Update (d .Object )
380
+ Indexer : indexer ,
381
+ }
382
+ c .queue .Add (uuid )
388
383
} else {
389
384
key , err := cache .MetaNamespaceKeyFunc (d .Object )
390
385
if err != nil {
391
386
panic (err )
392
387
}
393
388
394
- c .queue .Add (addOp {
389
+ uuid := d .Object .(metav1.Object ).GetUID ()
390
+ c .OpLock (uuid )
391
+ defer c .OpUnlock (uuid )
392
+
393
+ // If an update to the k8s object happens before add has succeeded
394
+ if op , ok := c .opMap [uuid ]; ok {
395
+ if _ , ok := op .(updateOp ); ok {
396
+ return fmt .Errorf ("cannot add already added object: %s" , key )
397
+ }
398
+ }
399
+ c .opMap [uuid ] = addOp {
395
400
Object : d .Object ,
396
401
AddFunc : & add ,
397
402
Key : key ,
398
- })
399
- return indexer .Add (d .Object )
403
+ Indexer : indexer ,
404
+ }
405
+ c .queue .Add (uuid )
400
406
}
401
407
case cache .Deleted :
402
408
key , err := cache .DeletionHandlingMetaNamespaceKeyFunc (d .Object )
403
409
if err != nil {
404
410
panic (err )
405
411
}
406
412
407
- c .queue .Add (deleteOp {
413
+ uuid := d .Object .(metav1.Object ).GetUID ()
414
+ c .OpLock (uuid )
415
+ defer c .OpUnlock (uuid )
416
+ c .opMap [uuid ] = deleteOp {
408
417
Object : d .Object ,
409
418
DeleteFunc : & delete ,
410
419
Key : key ,
411
- })
412
- return indexer .Delete (d .Object )
420
+ Indexer : indexer ,
421
+ }
422
+ c .queue .Add (uuid )
413
423
}
414
424
}
415
425
return nil
@@ -429,7 +439,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
429
439
}
430
440
431
441
for i := 0 ; i < c .threadiness ; i ++ {
432
- go wait . UntilWithContext ( ctx , c .runWorker , time . Second )
442
+ go c .runWorker ( ctx )
433
443
}
434
444
435
445
<- ctx .Done ()
0 commit comments