Skip to content

Commit a9c7a37

Browse files
committed
use flowcontrol backoff lib for publish/unpublish backoff
1 parent 11e2389 commit a9c7a37

File tree

5 files changed

+264
-309
lines changed

5 files changed

+264
-309
lines changed

pkg/gce-pd-csi-driver/controller.go

+72-117
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,18 @@ import (
3030
"google.golang.org/grpc/status"
3131
"k8s.io/apimachinery/pkg/util/sets"
3232
"k8s.io/apimachinery/pkg/util/uuid"
33-
"k8s.io/apimachinery/pkg/util/wait"
34-
"k8s.io/client-go/util/workqueue"
33+
"k8s.io/client-go/util/flowcontrol"
3534
"k8s.io/klog"
3635

3736
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3837
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
3938
)
4039

40+
const (
41+
nodeBackoffInitialDuration = 200 * time.Millisecond
42+
nodeBackoffMaxDuration = 2 * time.Minute
43+
)
44+
4145
type GCEControllerServer struct {
4246
Driver *GCEDriver
4347
CloudProvider gce.GCECompute
@@ -50,17 +54,9 @@ type GCEControllerServer struct {
5054
// Aborted error
5155
volumeLocks *common.VolumeLocks
5256

53-
// queue is a rate limited work queue for Controller Publish/Unpublish
54-
// Volume calls
55-
queue workqueue.RateLimitingInterface
56-
57-
// publishErrorsSeenOnNode is a list of nodes with attach/detach
58-
// operation failures so those nodes shall be rate limited for all
59-
// the attach/detach operations until there is an attach / detach
60-
// operation succeeds
61-
publishErrorsSeenOnNode map[string]bool
62-
63-
opsManager *OpsManager
57+
// nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible.
58+
nodeBackoff *flowcontrol.Backoff
59+
opsManager *OpsManager
6460
}
6561

6662
type workItem struct {
@@ -337,75 +333,31 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
337333
// Run starts the GCEControllerServer.
338334
func (gceCS *GCEControllerServer) Run() {
339335
go gceCS.opsManager.HydrateOpsCache()
340-
go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop)
341-
}
342-
343-
func (gceCS *GCEControllerServer) worker() {
344-
// Runs until workqueue is shut down
345-
for gceCS.processNextWorkItem() {
346-
}
347-
}
348-
349-
func (gceCS *GCEControllerServer) processNextWorkItem() bool {
350-
item, quit := gceCS.queue.Get()
351-
if quit {
352-
return false
353-
}
354-
defer gceCS.queue.Done(item)
355-
356-
workItem, ok := item.(*workItem)
357-
if !ok {
358-
gceCS.queue.AddRateLimited(item)
359-
return true
360-
}
361-
362-
if workItem.publishReq != nil {
363-
_, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq)
364-
365-
if err != nil {
366-
klog.Errorf("ControllerPublishVolume failed with error: %v", err)
367-
}
368-
}
369-
370-
if workItem.unpublishReq != nil {
371-
_, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq)
372-
373-
if err != nil {
374-
klog.Errorf("ControllerUnpublishVolume failed with error: %v", err)
375-
}
376-
}
377-
378-
gceCS.queue.Forget(item)
379-
return true
380336
}
381337

382338
func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
383339
if !gceCS.opsManager.IsReady() {
384340
return nil, status.Errorf(codes.Aborted, "Cache not ready")
385341
}
386342

387-
// Only valid requests will be queued
343+
// Only valid requests will be accepted
388344
_, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)
389-
390345
if err != nil {
391346
return nil, err
392347
}
393348

394-
// If the node is not marked, proceed the request
395-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
396-
return gceCS.executeControllerPublishVolume(ctx, req)
349+
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
350+
return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff", req.NodeId)
397351
}
398352

399-
// Node is marked so queue up the request. Note the original gRPC context may get canceled,
400-
// so a new one is created here.
401-
//
402-
// Note that the original context probably has a timeout (see csiAttach in external-attacher),
403-
// which is ignored.
404-
gceCS.queue.AddRateLimited(&workItem{
405-
ctx: context.Background(),
406-
publishReq: req,
407-
})
408-
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
353+
resp, backoff, err := gceCS.executeControllerPublishVolume(ctx, req)
354+
if backoff {
355+
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
356+
} else if err == nil {
357+
gceCS.nodeBackoff.Reset(req.NodeId)
358+
}
359+
360+
return resp, err
409361
}
410362

411363
func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) {
@@ -436,11 +388,12 @@ func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx con
436388
return project, volKey, nil
437389
}
438390

439-
func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
391+
// executeControllerPublishVolume returns the controllerPublishVolumeResponse object. If there is an error, the function provides additional boolean hint on whether to set a backoff condition on the node.
392+
func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, bool, error) {
440393
project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)
441394

442395
if err != nil {
443-
return nil, err
396+
return nil, false, err
444397
}
445398

446399
volumeID := req.GetVolumeId()
@@ -455,36 +408,36 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
455408
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
456409
if err != nil {
457410
if gce.IsGCENotFoundError(err) {
458-
return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err)
411+
return nil, false, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err)
459412
}
460-
return nil, status.Errorf(codes.Internal, "ControllerPublishVolume error repairing underspecified volume key: %v", err)
413+
return nil, false, status.Errorf(codes.Internal, "ControllerPublishVolume error repairing underspecified volume key: %v", err)
461414
}
462415

463416
// Acquires the lock for the volume on that node only, because we need to support the ability
464417
// to publish the same volume onto different nodes concurrently
465418
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
466419
if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired {
467-
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
420+
return nil, false, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
468421
}
469422
defer gceCS.volumeLocks.Release(lockingVolumeID)
470423

471424
_, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
472425
if err != nil {
473426
if gce.IsGCENotFoundError(err) {
474-
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find disk %v: %v", volKey.String(), err))
427+
return nil, false, status.Error(codes.NotFound, fmt.Sprintf("Could not find disk %v: %v", volKey.String(), err))
475428
}
476-
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown get disk error: %v", err))
429+
return nil, true, status.Error(codes.Internal, fmt.Sprintf("Unknown get disk error: %v", err))
477430
}
478431
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
479432
if err != nil {
480-
return nil, status.Error(codes.NotFound, fmt.Sprintf("could not split nodeID: %v", err))
433+
return nil, false, status.Error(codes.NotFound, fmt.Sprintf("could not split nodeID: %v", err))
481434
}
482435
instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName)
483436
if err != nil {
484437
if gce.IsGCENotFoundError(err) {
485-
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find instance %v: %v", nodeID, err))
438+
return nil, false, status.Error(codes.NotFound, fmt.Sprintf("Could not find instance %v: %v", nodeID, err))
486439
}
487-
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown get instance error: %v", err))
440+
return nil, true, status.Error(codes.Internal, fmt.Sprintf("Unknown get instance error: %v", err))
488441
}
489442

490443
readWrite := "READ_WRITE"
@@ -494,23 +447,23 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
494447

495448
deviceName, err := common.GetDeviceName(volKey)
496449
if err != nil {
497-
return nil, status.Error(codes.Internal, fmt.Sprintf("error getting device name: %v", err))
450+
return nil, false, status.Error(codes.Internal, fmt.Sprintf("error getting device name: %v", err))
498451
}
499452

500453
attached, err := diskIsAttachedAndCompatible(deviceName, instance, volumeCapability, readWrite)
501454
if err != nil {
502-
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Disk %v already published to node %v but incompatbile: %v", volKey.Name, nodeID, err))
455+
return nil, false, status.Error(codes.AlreadyExists, fmt.Sprintf("Disk %v already published to node %v but incompatbile: %v", volKey.Name, nodeID, err))
503456
}
504457
if attached {
505458
// Volume is attached to node. Success!
506459
klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v, already attached.", volKey, nodeID)
507-
return pubVolResp, nil
460+
return pubVolResp, false, nil
508461
}
509462

510463
// Check and initiate an attach disk operation.
511464
instanceZone, instanceName, err = common.NodeIDToZoneAndName(nodeID)
512465
if err != nil {
513-
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("could not split nodeID: %v", err))
466+
return nil, false, status.Error(codes.InvalidArgument, fmt.Sprintf("could not split nodeID: %v", err))
514467
}
515468

516469
err = gceCS.opsManager.ExecuteAttachDisk(ctx, &AttachDiskOpts{
@@ -523,48 +476,44 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
523476
InstanceName: instanceName,
524477
})
525478
if err != nil {
526-
return nil, err
479+
backoff := false
480+
// Errors during API calls are reported as Internal
481+
if isInternalError(err) {
482+
backoff = true
483+
}
484+
return nil, backoff, err
527485
}
528486

529487
err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName)
530488
if err != nil {
531-
// Mark the node and rate limit all the following attach/detach
532-
// operations for this node
533-
gceCS.publishErrorsSeenOnNode[nodeID] = true
534-
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
489+
return nil, true, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
535490
}
536491

537-
// Attach succeeds so unmark the node
538-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
539-
540492
klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID)
541-
return pubVolResp, nil
493+
return pubVolResp, false, nil
542494
}
543495

544496
func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
545497
if !gceCS.opsManager.IsReady() {
546498
return nil, status.Errorf(codes.Aborted, "Cache not ready")
547499
}
548500

549-
// Only valid requests will be queued
501+
// Only valid requests will be accepted.
550502
_, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
551-
552503
if err != nil {
553504
return nil, err
554505
}
555506

556-
// If the node is not marked, proceed the request
557-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
558-
return gceCS.executeControllerUnpublishVolume(ctx, req)
507+
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
508+
return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff", req.NodeId)
559509
}
560-
561-
// Node is marked so queue up the request
562-
gceCS.queue.AddRateLimited(&workItem{
563-
ctx: ctx,
564-
unpublishReq: req,
565-
})
566-
567-
return &csi.ControllerUnpublishVolumeResponse{}, nil
510+
resp, backoff, err := gceCS.executeControllerUnpublishVolume(ctx, req)
511+
if backoff {
512+
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
513+
} else if err == nil {
514+
gceCS.nodeBackoff.Reset(req.NodeId)
515+
}
516+
return resp, err
568517
}
569518

570519
func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) {
@@ -586,56 +535,57 @@ func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx c
586535
return project, volKey, nil
587536
}
588537

589-
func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
538+
// executeControllerPublishVolume returns the controllerUnpublishVolumeResponse object. If there is an error, the function provides additional boolean hint on whether to set a backoff condition on the node.
539+
func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, bool, error) {
590540
project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
591541

592542
if err != nil {
593-
return nil, err
543+
return nil, false, err
594544
}
595545

596546
volumeID := req.GetVolumeId()
597547
nodeID := req.GetNodeId()
598548
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
599549
if err != nil {
600550
if gce.IsGCENotFoundError(err) {
601-
return nil, status.Errorf(codes.NotFound, "ControllerUnpublishVolume could not find volume with ID %v: %v", volumeID, err)
551+
return nil, false, status.Errorf(codes.NotFound, "ControllerUnpublishVolume could not find volume with ID %v: %v", volumeID, err)
602552
}
603-
return nil, status.Errorf(codes.Internal, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err)
553+
return nil, true, status.Errorf(codes.Internal, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err)
604554
}
605555

606556
// Acquires the lock for the volume on that node only, because we need to support the ability
607557
// to unpublish the same volume from different nodes concurrently
608558
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
609559
if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired {
610-
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
560+
return nil, false, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
611561
}
612562
defer gceCS.volumeLocks.Release(lockingVolumeID)
613563

614564
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
615565
if err != nil {
616-
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("could not split nodeID: %v", err))
566+
return nil, false, status.Error(codes.InvalidArgument, fmt.Sprintf("could not split nodeID: %v", err))
617567
}
618568
instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName)
619569
if err != nil {
620570
if gce.IsGCENotFoundError(err) {
621571
// Node not existing on GCE means that disk has been detached
622572
klog.Warningf("Treating volume %v as unpublished because node %v could not be found", volKey.String(), instanceName)
623-
return &csi.ControllerUnpublishVolumeResponse{}, nil
573+
return &csi.ControllerUnpublishVolumeResponse{}, false, nil
624574
}
625-
return nil, status.Error(codes.Internal, fmt.Sprintf("error getting instance: %v", err))
575+
return nil, true, status.Error(codes.Internal, fmt.Sprintf("error getting instance: %v", err))
626576
}
627577

628578
deviceName, err := common.GetDeviceName(volKey)
629579
if err != nil {
630-
return nil, status.Error(codes.Internal, fmt.Sprintf("error getting device name: %v", err))
580+
return nil, false, status.Error(codes.Internal, fmt.Sprintf("error getting device name: %v", err))
631581
}
632582

633583
attached := diskIsAttached(deviceName, instance)
634584

635585
if !attached {
636586
// Volume is not attached to node. Success!
637587
klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v. Already not attached.", volKey, nodeID)
638-
return &csi.ControllerUnpublishVolumeResponse{}, nil
588+
return &csi.ControllerUnpublishVolumeResponse{}, false, nil
639589
}
640590

641591
err = gceCS.opsManager.ExecuteDetachDisk(ctx, &DetachDiskOpts{
@@ -645,12 +595,17 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
645595
InstanceName: instanceName,
646596
})
647597
if err != nil {
648-
return nil, err
598+
backoff := false
599+
// Errors during API calls are reported as Internal
600+
if isInternalError(err) {
601+
backoff = true
602+
}
603+
return nil, backoff, err
649604
}
650605

651-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
606+
// delete(gceCS.publishErrorsSeenOnNode, nodeID)
652607
klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID)
653-
return &csi.ControllerUnpublishVolumeResponse{}, nil
608+
return &csi.ControllerUnpublishVolumeResponse{}, false, nil
654609
}
655610

656611
func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {

0 commit comments

Comments
 (0)