@@ -103,7 +103,7 @@ type ObjectStorageController struct {
103
103
104
104
lockerLock sync.Mutex
105
105
locker map [types.UID ]* sync.Mutex
106
- opMap map [types. UID ] interface {}
106
+ opMap * sync. Map
107
107
}
108
108
109
109
func NewDefaultObjectStorageController (identity string , leaderLockName string , threads int ) (* ObjectStorageController , error ) {
@@ -162,7 +162,7 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
162
162
RenewDeadline : 15 * time .Second ,
163
163
RetryPeriod : 5 * time .Second ,
164
164
165
- opMap : map [types. UID ] interface {} {},
165
+ opMap : & sync. Map {},
166
166
}, nil
167
167
}
168
168
@@ -250,21 +250,22 @@ func (c *ObjectStorageController) processNextItem(ctx context.Context) bool {
250
250
if quit {
251
251
return false
252
252
}
253
-
253
+
254
254
uuid := uuidInterface .(types.UID )
255
255
var err error
256
- // With the lock below in place, we can safely tell the queue that we are done
257
- // processing this item. The lock will ensure that multiple items of the same
258
- // name and kind do not get processed simultaneously
256
+
259
257
defer c .queue .Done (uuid )
260
258
259
+ op , ok := c .opMap .Load (uuid )
260
+ if ! ok {
261
+ panic ("unreachable code" )
262
+ }
263
+
261
264
// Ensure that multiple operations on different versions of the same object
262
265
// do not happen in parallel
263
266
c .OpLock (uuid )
264
267
defer c .OpUnlock (uuid )
265
268
266
- op := c .opMap [uuid ]
267
-
268
269
switch o := op .(type ) {
269
270
case addOp :
270
271
add := * o .AddFunc
@@ -332,12 +333,12 @@ func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex {
332
333
}
333
334
334
335
// handleErr checks if an error happened and makes sure we will retry later.
335
- func (c * ObjectStorageController ) handleErr (err error , op interface {} ) {
336
+ func (c * ObjectStorageController ) handleErr (err error , uuid types. UID ) {
336
337
if err == nil {
337
- c .queue . Forget ( op )
338
+ c .opMap . Delete ( uuid )
338
339
return
339
340
}
340
- c .queue .AddRateLimited (op )
341
+ c .queue .AddRateLimited (uuid )
341
342
}
342
343
343
344
func (c * ObjectStorageController ) runController (ctx context.Context ) {
@@ -348,8 +349,8 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
348
349
lw := cache .NewListWatchFromClient (c .bucketClient .ObjectstorageV1alpha1 ().RESTClient (), name , "" , fields .Everything ())
349
350
cfg := & cache.Config {
350
351
Queue : cache .NewDeltaFIFOWithOptions (cache.DeltaFIFOOptions {
351
- KnownObjects : indexer ,
352
- EmitDeltaTypeReplaced : true ,
352
+ KnownObjects : indexer ,
353
+ EmitDeltaTypeReplaced : false ,
353
354
}),
354
355
ListerWatcher : lw ,
355
356
ObjectType : objType ,
@@ -370,15 +371,14 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
370
371
}
371
372
372
373
uuid := d .Object .(metav1.Object ).GetUID ()
373
- c .OpLock (uuid )
374
- defer c .OpUnlock (uuid )
375
- c .opMap [uuid ] = updateOp {
374
+
375
+ c .opMap .Store (uuid , updateOp {
376
376
OldObject : old ,
377
377
NewObject : d .Object ,
378
378
UpdateFunc : & update ,
379
379
Key : key ,
380
380
Indexer : indexer ,
381
- }
381
+ })
382
382
c .queue .Add (uuid )
383
383
} else {
384
384
key , err := cache .MetaNamespaceKeyFunc (d .Object )
@@ -387,21 +387,20 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
387
387
}
388
388
389
389
uuid := d .Object .(metav1.Object ).GetUID ()
390
- c .OpLock (uuid )
391
- defer c .OpUnlock (uuid )
392
-
390
+
393
391
// If an update to the k8s object happens before add has succeeded
394
- if op , ok := c .opMap [ uuid ] ; ok {
392
+ if op , ok := c .opMap . Load ( uuid ) ; ok {
395
393
if _ , ok := op .(updateOp ); ok {
396
- return fmt .Errorf ("cannot add already added object: %s" , key )
394
+ err := fmt .Errorf ("cannot add already added object: %s" , key )
395
+ return err
397
396
}
398
397
}
399
- c .opMap [ uuid ] = addOp {
398
+ c .opMap . Store ( uuid , addOp {
400
399
Object : d .Object ,
401
400
AddFunc : & add ,
402
401
Key : key ,
403
402
Indexer : indexer ,
404
- }
403
+ })
405
404
c .queue .Add (uuid )
406
405
}
407
406
case cache .Deleted :
@@ -411,14 +410,12 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
411
410
}
412
411
413
412
uuid := d .Object .(metav1.Object ).GetUID ()
414
- c .OpLock (uuid )
415
- defer c .OpUnlock (uuid )
416
- c .opMap [uuid ] = deleteOp {
413
+ c .opMap .Store (uuid , deleteOp {
417
414
Object : d .Object ,
418
415
DeleteFunc : & delete ,
419
416
Key : key ,
420
417
Indexer : indexer ,
421
- }
418
+ })
422
419
c .queue .Add (uuid )
423
420
}
424
421
}
0 commit comments