@@ -103,7 +103,7 @@ type ObjectStorageController struct {
103
103
104
104
lockerLock sync.Mutex
105
105
locker map [types.UID ]* sync.Mutex
106
- opMap map [types. UID ] interface {}
106
+ opMap * sync. Map
107
107
}
108
108
109
109
func NewDefaultObjectStorageController (identity string , leaderLockName string , threads int ) (* ObjectStorageController , error ) {
@@ -162,7 +162,7 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
162
162
RenewDeadline : 15 * time .Second ,
163
163
RetryPeriod : 5 * time .Second ,
164
164
165
- opMap : map [types. UID ] interface {} {},
165
+ opMap : & sync. Map {},
166
166
}, nil
167
167
}
168
168
@@ -253,18 +253,19 @@ func (c *ObjectStorageController) processNextItem(ctx context.Context) bool {
253
253
254
254
uuid := uuidInterface .(types.UID )
255
255
var err error
256
- // With the lock below in place, we can safely tell the queue that we are done
257
- // processing this item. The lock will ensure that multiple items of the same
258
- // name and kind do not get processed simultaneously
256
+
259
257
defer c .queue .Done (uuid )
260
258
259
+ op , ok := c .opMap .Load (uuid )
260
+ if ! ok {
261
+ panic ("unreachable code" )
262
+ }
263
+
261
264
// Ensure that multiple operations on different versions of the same object
262
265
// do not happen in parallel
263
266
c .OpLock (uuid )
264
267
defer c .OpUnlock (uuid )
265
268
266
- op := c .opMap [uuid ]
267
-
268
269
switch o := op .(type ) {
269
270
case addOp :
270
271
add := * o .AddFunc
@@ -332,12 +333,12 @@ func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex {
332
333
}
333
334
334
335
// handleErr checks if an error happened and makes sure we will retry later.
335
- func (c * ObjectStorageController ) handleErr (err error , op interface {} ) {
336
+ func (c * ObjectStorageController ) handleErr (err error , uuid types. UID ) {
336
337
if err == nil {
337
- c .queue . Forget ( op )
338
+ c .opMap . Delete ( uuid )
338
339
return
339
340
}
340
- c .queue .AddRateLimited (op )
341
+ c .queue .AddRateLimited (uuid )
341
342
}
342
343
343
344
func (c * ObjectStorageController ) runController (ctx context.Context ) {
@@ -349,7 +350,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
349
350
cfg := & cache.Config {
350
351
Queue : cache .NewDeltaFIFOWithOptions (cache.DeltaFIFOOptions {
351
352
KnownObjects : indexer ,
352
- EmitDeltaTypeReplaced : true ,
353
+ EmitDeltaTypeReplaced : false ,
353
354
}),
354
355
ListerWatcher : lw ,
355
356
ObjectType : objType ,
@@ -370,15 +371,14 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
370
371
}
371
372
372
373
uuid := d .Object .(metav1.Object ).GetUID ()
373
- c .OpLock (uuid )
374
- defer c .OpUnlock (uuid )
375
- c .opMap [uuid ] = updateOp {
374
+
375
+ c .opMap .Store (uuid , updateOp {
376
376
OldObject : old ,
377
377
NewObject : d .Object ,
378
378
UpdateFunc : & update ,
379
379
Key : key ,
380
380
Indexer : indexer ,
381
- }
381
+ })
382
382
c .queue .Add (uuid )
383
383
} else {
384
384
key , err := cache .MetaNamespaceKeyFunc (d .Object )
@@ -387,20 +387,17 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
387
387
}
388
388
389
389
uuid := d .Object .(metav1.Object ).GetUID ()
390
- c .OpLock (uuid )
391
- defer c .OpUnlock (uuid )
392
-
393
- // If an update to the k8s object happens before add has succeeded
394
- if op , ok := c .opMap [uuid ]; ok {
395
- if _ , ok := op .(updateOp ); ok {
396
- return fmt .Errorf ("cannot add already added object: %s" , key )
397
- }
398
- }
399
- c .opMap [uuid ] = addOp {
390
+
391
+ if op , ok := c .opMap .LoadOrStore (uuid , addOp {
400
392
Object : d .Object ,
401
393
AddFunc : & add ,
402
394
Key : key ,
403
395
Indexer : indexer ,
396
+ }); ok { // If an update to the k8s object happens before add has succeeded
397
+ if _ , ok := op .(updateOp ); ok {
398
+ err := fmt .Errorf ("cannot add already added object: %s" , key )
399
+ return err
400
+ }
404
401
}
405
402
c .queue .Add (uuid )
406
403
}
@@ -411,14 +408,12 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
411
408
}
412
409
413
410
uuid := d .Object .(metav1.Object ).GetUID ()
414
- c .OpLock (uuid )
415
- defer c .OpUnlock (uuid )
416
- c .opMap [uuid ] = deleteOp {
411
+ c .opMap .Store (uuid , deleteOp {
417
412
Object : d .Object ,
418
413
DeleteFunc : & delete ,
419
414
Key : key ,
420
415
Indexer : indexer ,
421
- }
416
+ })
422
417
c .queue .Add (uuid )
423
418
}
424
419
}
0 commit comments