@@ -30,14 +30,18 @@ import (
30
30
"google.golang.org/grpc/status"
31
31
"k8s.io/apimachinery/pkg/util/sets"
32
32
"k8s.io/apimachinery/pkg/util/uuid"
33
- "k8s.io/apimachinery/pkg/util/wait"
34
- "k8s.io/client-go/util/workqueue"
33
+ "k8s.io/client-go/util/flowcontrol"
35
34
"k8s.io/klog"
36
35
37
36
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
38
37
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
39
38
)
40
39
40
+ const (
41
+ nodeBackoffInitialDuration = 200 * time .Millisecond
42
+ nodeBackoffMaxDuration = 5 * time .Minute
43
+ )
44
+
41
45
type GCEControllerServer struct {
42
46
Driver * GCEDriver
43
47
CloudProvider gce.GCECompute
@@ -50,17 +54,9 @@ type GCEControllerServer struct {
50
54
// Aborted error
51
55
volumeLocks * common.VolumeLocks
52
56
53
- // queue is a rate limited work queue for Controller Publish/Unpublish
54
- // Volume calls
55
- queue workqueue.RateLimitingInterface
56
-
57
- // publishErrorsSeenOnNode is a list of nodes with attach/detach
58
- // operation failures so those nodes shall be rate limited for all
59
- // the attach/detach operations until there is an attach / detach
60
- // operation succeeds
61
- publishErrorsSeenOnNode map [string ]bool
62
-
63
- opsManager * OpsManager
57
+ // nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible.
58
+ nodeBackoff * flowcontrol.Backoff
59
+ opsManager * OpsManager
64
60
}
65
61
66
62
type workItem struct {
@@ -337,75 +333,34 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
337
333
// Run starts the GCEControllerServer.
338
334
func (gceCS * GCEControllerServer ) Run () {
339
335
go gceCS .opsManager .HydrateOpsCache ()
340
- go wait .Until (gceCS .worker , 1 * time .Second , wait .NeverStop )
341
- }
342
-
343
- func (gceCS * GCEControllerServer ) worker () {
344
- // Runs until workqueue is shut down
345
- for gceCS .processNextWorkItem () {
346
- }
347
- }
348
-
349
- func (gceCS * GCEControllerServer ) processNextWorkItem () bool {
350
- item , quit := gceCS .queue .Get ()
351
- if quit {
352
- return false
353
- }
354
- defer gceCS .queue .Done (item )
355
-
356
- workItem , ok := item .(* workItem )
357
- if ! ok {
358
- gceCS .queue .AddRateLimited (item )
359
- return true
360
- }
361
-
362
- if workItem .publishReq != nil {
363
- _ , err := gceCS .executeControllerPublishVolume (workItem .ctx , workItem .publishReq )
364
-
365
- if err != nil {
366
- klog .Errorf ("ControllerPublishVolume failed with error: %v" , err )
367
- }
368
- }
369
-
370
- if workItem .unpublishReq != nil {
371
- _ , err := gceCS .executeControllerUnpublishVolume (workItem .ctx , workItem .unpublishReq )
372
-
373
- if err != nil {
374
- klog .Errorf ("ControllerUnpublishVolume failed with error: %v" , err )
375
- }
376
- }
377
-
378
- gceCS .queue .Forget (item )
379
- return true
380
336
}
381
337
382
338
func (gceCS * GCEControllerServer ) ControllerPublishVolume (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (* csi.ControllerPublishVolumeResponse , error ) {
383
339
if ! gceCS .opsManager .IsReady () {
384
340
return nil , status .Errorf (codes .Aborted , "Cache not ready" )
385
341
}
386
342
387
- // Only valid requests will be queued
343
+ // Only valid requests will be accepted
388
344
_ , _ , err := gceCS .validateControllerPublishVolumeRequest (ctx , req )
389
-
390
345
if err != nil {
391
346
return nil , err
392
347
}
393
348
394
- // If the node is not marked, proceed the request
395
- if _ , found := gceCS .publishErrorsSeenOnNode [req .NodeId ]; ! found {
396
- return gceCS .executeControllerPublishVolume (ctx , req )
349
+ if gceCS .nodeBackoff .IsInBackOffSinceUpdate (req .NodeId , gceCS .nodeBackoff .Clock .Now ()) {
350
+ return nil , status .Errorf (codes .Unavailable , "ControllerPublish not permitted on node %q due to backoff" , req .NodeId )
397
351
}
398
352
399
- // Node is marked so queue up the request. Note the original gRPC context may get canceled,
400
- // so a new one is created here.
401
- //
402
- // Note that the original context probably has a timeout (see csiAttach in external-attacher),
403
- // which is ignored.
404
- gceCS .queue .AddRateLimited (& workItem {
405
- ctx : context .Background (),
406
- publishReq : req ,
407
- })
408
- return nil , status .Error (codes .Unavailable , "Request queued due to error condition on node" )
353
+ resp , err := gceCS .executeControllerPublishVolume (ctx , req )
354
+ backoff := isResourceExhaustedError (err )
355
+ if backoff && ! gceCS .nodeBackoff .IsInBackOffSinceUpdate (req .NodeId , gceCS .nodeBackoff .Clock .Now ()) {
356
+ klog .V (5 ).Infof ("For node %s adding backoff due to error for volume %s" , req .NodeId , req .VolumeId )
357
+ gceCS .nodeBackoff .Next (req .NodeId , gceCS .nodeBackoff .Clock .Now ())
358
+ } else if err == nil {
359
+ klog .V (5 ).Infof ("For node %s clear backoff due to successful publish of volume %v" , req .NodeId , req .VolumeId )
360
+ gceCS .nodeBackoff .Reset (req .NodeId )
361
+ }
362
+
363
+ return resp , err
409
364
}
410
365
411
366
func (gceCS * GCEControllerServer ) validateControllerPublishVolumeRequest (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (string , * meta.Key , error ) {
@@ -438,7 +393,6 @@ func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx con
438
393
439
394
func (gceCS * GCEControllerServer ) executeControllerPublishVolume (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (* csi.ControllerPublishVolumeResponse , error ) {
440
395
project , volKey , err := gceCS .validateControllerPublishVolumeRequest (ctx , req )
441
-
442
396
if err != nil {
443
397
return nil , err
444
398
}
@@ -457,6 +411,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
457
411
if gce .IsGCENotFoundError (err ) {
458
412
return nil , status .Errorf (codes .NotFound , "ControllerPublishVolume could not find volume with ID %v: %v" , volumeID , err )
459
413
}
414
+ if gce .IsTooManyRequestError (err ) {
415
+ return nil , status .Errorf (codes .ResourceExhausted , "ControllerPublishVolume error repairing underspecified volume key: %v" , err )
416
+ }
460
417
return nil , status .Errorf (codes .Internal , "ControllerPublishVolume error repairing underspecified volume key: %v" , err )
461
418
}
462
419
@@ -473,6 +430,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
473
430
if gce .IsGCENotFoundError (err ) {
474
431
return nil , status .Error (codes .NotFound , fmt .Sprintf ("Could not find disk %v: %v" , volKey .String (), err ))
475
432
}
433
+ if gce .IsTooManyRequestError (err ) {
434
+ return nil , status .Errorf (codes .ResourceExhausted , "get disk error: %v" , err )
435
+ }
476
436
return nil , status .Error (codes .Internal , fmt .Sprintf ("Unknown get disk error: %v" , err ))
477
437
}
478
438
instanceZone , instanceName , err := common .NodeIDToZoneAndName (nodeID )
@@ -484,6 +444,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
484
444
if gce .IsGCENotFoundError (err ) {
485
445
return nil , status .Error (codes .NotFound , fmt .Sprintf ("Could not find instance %v: %v" , nodeID , err ))
486
446
}
447
+ if gce .IsTooManyRequestError (err ) {
448
+ return nil , status .Errorf (codes .ResourceExhausted , "get instance error: %v" , err )
449
+ }
487
450
return nil , status .Error (codes .Internal , fmt .Sprintf ("Unknown get instance error: %v" , err ))
488
451
}
489
452
@@ -523,20 +486,20 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
523
486
InstanceName : instanceName ,
524
487
})
525
488
if err != nil {
489
+ if gce .IsTooManyRequestError (err ) {
490
+ return nil , status .Errorf (codes .ResourceExhausted , "Failed to execute attach operation, error: %v" , err )
491
+ }
526
492
return nil , err
527
493
}
528
494
529
495
err = gceCS .CloudProvider .WaitForAttach (ctx , project , volKey , instanceZone , instanceName )
530
496
if err != nil {
531
- // Mark the node and rate limit all the following attach/detach
532
- // operations for this node
533
- gceCS . publishErrorsSeenOnNode [ nodeID ] = true
497
+ if gce . IsTooManyRequestError ( err ) {
498
+ return nil , status . Errorf ( codes . ResourceExhausted , "Failed to execute wait for attach operation, error: %v" , err )
499
+ }
534
500
return nil , status .Error (codes .Internal , fmt .Sprintf ("unknown WaitForAttach error: %v" , err ))
535
501
}
536
502
537
- // Attach succeeds so unmark the node
538
- delete (gceCS .publishErrorsSeenOnNode , nodeID )
539
-
540
503
klog .V (4 ).Infof ("ControllerPublishVolume succeeded for disk %v to instance %v" , volKey , nodeID )
541
504
return pubVolResp , nil
542
505
}
@@ -546,25 +509,25 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
546
509
return nil , status .Errorf (codes .Aborted , "Cache not ready" )
547
510
}
548
511
549
- // Only valid requests will be queued
512
+ // Only valid requests will be accepted.
550
513
_ , _ , err := gceCS .validateControllerUnpublishVolumeRequest (ctx , req )
551
-
552
514
if err != nil {
553
515
return nil , err
554
516
}
555
517
556
- // If the node is not marked, proceed the request
557
- if _ , found := gceCS .publishErrorsSeenOnNode [req .NodeId ]; ! found {
558
- return gceCS .executeControllerUnpublishVolume (ctx , req )
518
+ if gceCS .nodeBackoff .IsInBackOffSinceUpdate (req .NodeId , gceCS .nodeBackoff .Clock .Now ()) {
519
+ return nil , status .Errorf (codes .Unavailable , "ControllerUnpublish not permitted on node %q due to backoff" , req .NodeId )
559
520
}
560
-
561
- // Node is marked so queue up the request
562
- gceCS .queue .AddRateLimited (& workItem {
563
- ctx : ctx ,
564
- unpublishReq : req ,
565
- })
566
-
567
- return & csi.ControllerUnpublishVolumeResponse {}, nil
521
+ resp , err := gceCS .executeControllerUnpublishVolume (ctx , req )
522
+ backoff := isResourceExhaustedError (err )
523
+ if backoff && ! gceCS .nodeBackoff .IsInBackOffSinceUpdate (req .NodeId , gceCS .nodeBackoff .Clock .Now ()) {
524
+ klog .V (5 ).Infof ("For node %s adding backoff due to unpublish error for volume %s" , req .NodeId , req .VolumeId )
525
+ gceCS .nodeBackoff .Next (req .NodeId , gceCS .nodeBackoff .Clock .Now ())
526
+ } else if err == nil {
527
+ klog .V (5 ).Infof ("For node %s clear backoff due to succesful unpublish of volume %s" , req .NodeId , req .VolumeId )
528
+ gceCS .nodeBackoff .Reset (req .NodeId )
529
+ }
530
+ return resp , err
568
531
}
569
532
570
533
func (gceCS * GCEControllerServer ) validateControllerUnpublishVolumeRequest (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (string , * meta.Key , error ) {
@@ -588,7 +551,6 @@ func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx c
588
551
589
552
func (gceCS * GCEControllerServer ) executeControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
590
553
project , volKey , err := gceCS .validateControllerUnpublishVolumeRequest (ctx , req )
591
-
592
554
if err != nil {
593
555
return nil , err
594
556
}
@@ -600,6 +562,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
600
562
if gce .IsGCENotFoundError (err ) {
601
563
return nil , status .Errorf (codes .NotFound , "ControllerUnpublishVolume could not find volume with ID %v: %v" , volumeID , err )
602
564
}
565
+ if gce .IsTooManyRequestError (err ) {
566
+ return nil , status .Errorf (codes .ResourceExhausted , "ControllerUnpublishVolume error repairing underspecified volume key: %v" , err )
567
+ }
603
568
return nil , status .Errorf (codes .Internal , "ControllerUnpublishVolume error repairing underspecified volume key: %v" , err )
604
569
}
605
570
@@ -622,6 +587,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
622
587
klog .Warningf ("Treating volume %v as unpublished because node %v could not be found" , volKey .String (), instanceName )
623
588
return & csi.ControllerUnpublishVolumeResponse {}, nil
624
589
}
590
+ if gce .IsTooManyRequestError (err ) {
591
+ return nil , status .Errorf (codes .ResourceExhausted , "ControllerUnpublishVolume error repairing underspecified volume key: %v" , err )
592
+ }
625
593
return nil , status .Error (codes .Internal , fmt .Sprintf ("error getting instance: %v" , err ))
626
594
}
627
595
@@ -645,10 +613,12 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
645
613
InstanceName : instanceName ,
646
614
})
647
615
if err != nil {
616
+ if gce .IsTooManyRequestError (err ) {
617
+ return nil , status .Errorf (codes .ResourceExhausted , "Failed to execute detach operation, error: %v" , err )
618
+ }
648
619
return nil , err
649
620
}
650
621
651
- delete (gceCS .publishErrorsSeenOnNode , nodeID )
652
622
klog .V (4 ).Infof ("ControllerUnpublishVolume succeeded for disk %v from node %v" , volKey , nodeID )
653
623
return & csi.ControllerUnpublishVolumeResponse {}, nil
654
624
}
0 commit comments