Skip to content

Commit 2130cd4

Browse files
authored
Merge pull request #988 from saikat-royc/fix-node-backoff
Simplify node backoff logic for controller publish/unpublish op
2 parents 5c5a4c4 + 34b6c1b commit 2130cd4

File tree

6 files changed

+320
-274
lines changed

6 files changed

+320
-274
lines changed

pkg/gce-cloud-provider/compute/fake-gce.go

+28-3
Original file line numberDiff line numberDiff line change
@@ -504,27 +504,47 @@ func (cloud *FakeCloudProvider) UpdateDiskStatus(s string) {
504504

505505
type FakeBlockingCloudProvider struct {
506506
*FakeCloudProvider
507-
ReadyToExecute chan chan struct{}
507+
ReadyToExecute chan chan Signal
508508
}
509509

510510
// FakeBlockingCloudProvider's method adds functionality to finely control the order of execution of CreateSnapshot calls.
511511
// Upon starting a CreateSnapshot, it passes a chan 'executeCreateSnapshot' into readyToExecute, then blocks on executeCreateSnapshot.
512512
// The test calling this function can block on readyToExecute to ensure that the operation has started and
513513
// allowed the CreateSnapshot to continue by passing a struct into executeCreateSnapshot.
514514
func (cloud *FakeBlockingCloudProvider) CreateSnapshot(ctx context.Context, project string, volKey *meta.Key, snapshotName string, snapshotParams common.SnapshotParameters) (*computev1.Snapshot, error) {
515-
executeCreateSnapshot := make(chan struct{})
515+
executeCreateSnapshot := make(chan Signal)
516516
cloud.ReadyToExecute <- executeCreateSnapshot
517517
<-executeCreateSnapshot
518518
return cloud.FakeCloudProvider.CreateSnapshot(ctx, project, volKey, snapshotName, snapshotParams)
519519
}
520520

521521
func (cloud *FakeBlockingCloudProvider) CreateImage(ctx context.Context, project string, volKey *meta.Key, imageName string, snapshotParams common.SnapshotParameters) (*computev1.Image, error) {
522-
executeCreateSnapshot := make(chan struct{})
522+
executeCreateSnapshot := make(chan Signal)
523523
cloud.ReadyToExecute <- executeCreateSnapshot
524524
<-executeCreateSnapshot
525525
return cloud.FakeCloudProvider.CreateImage(ctx, project, volKey, imageName, snapshotParams)
526526
}
527527

528+
func (cloud *FakeBlockingCloudProvider) DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error {
529+
execute := make(chan Signal)
530+
cloud.ReadyToExecute <- execute
531+
val := <-execute
532+
if val.ReportError {
533+
return fmt.Errorf("force mock error for DetachDisk device %s", deviceName)
534+
}
535+
return cloud.FakeCloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName)
536+
}
537+
538+
func (cloud *FakeBlockingCloudProvider) AttachDisk(ctx context.Context, project string, volKey *meta.Key, readWrite, diskType, instanceZone, instanceName string) error {
539+
execute := make(chan Signal)
540+
cloud.ReadyToExecute <- execute
541+
val := <-execute
542+
if val.ReportError {
543+
return fmt.Errorf("force mock error for AttachDisk: volkey %s", volKey)
544+
}
545+
return cloud.FakeCloudProvider.AttachDisk(ctx, project, volKey, readWrite, diskType, instanceZone, instanceName)
546+
}
547+
528548
func notFoundError() *googleapi.Error {
529549
return &googleapi.Error{
530550
Errors: []googleapi.ErrorItem{
@@ -544,3 +564,8 @@ func invalidError() *googleapi.Error {
544564
},
545565
}
546566
}
567+
568+
type Signal struct {
569+
ReportError bool
570+
ReportRunning bool
571+
}

pkg/gce-pd-csi-driver/controller.go

+32-94
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ import (
3131
"google.golang.org/grpc/status"
3232
"k8s.io/apimachinery/pkg/util/sets"
3333
"k8s.io/apimachinery/pkg/util/uuid"
34-
"k8s.io/apimachinery/pkg/util/wait"
35-
"k8s.io/client-go/util/workqueue"
34+
"k8s.io/client-go/util/flowcontrol"
3635
"k8s.io/klog"
3736

3837
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3938
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
4039
)
4140

41+
const (
42+
nodeBackoffInitialDuration = 200 * time.Millisecond
43+
nodeBackoffMaxDuration = 5 * time.Minute
44+
)
45+
4246
type GCEControllerServer struct {
4347
Driver *GCEDriver
4448
CloudProvider gce.GCECompute
@@ -54,15 +58,9 @@ type GCEControllerServer struct {
5458
// Aborted error
5559
volumeLocks *common.VolumeLocks
5660

57-
// queue is a rate limited work queue for Controller Publish/Unpublish
58-
// Volume calls
59-
queue workqueue.RateLimitingInterface
60-
61-
// publishErrorsSeenOnNode is a list of nodes with attach/detach
62-
// operation failures so those nodes shall be rate limited for all
63-
// the attach/detach operations until there is an attach / detach
64-
// operation succeeds
65-
publishErrorsSeenOnNode map[string]bool
61+
// When the attacher sidecar issues controller publish/unpublish for multiple disks for a given node, the per-instance operation queue in GCE fills up causing attach/detach disk requests to immediately return with an error until the queue drains. nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible. A node is marked with backoff when any error is encountered by the driver during controller publish/unpublish calls.
62+
// If the controller eventually allows controller publish/publish requests for volumes (because the backoff time expired), and those requests fail, the next backoff retry time will be updated on every failure and capped at 'nodeBackoffMaxDuration'. Also, any successful controller publish/unpublish call will clear the backoff condition for the node.
63+
nodeBackoff *flowcontrol.Backoff
6664
}
6765

6866
type workItem struct {
@@ -364,73 +362,26 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
364362
return &csi.DeleteVolumeResponse{}, nil
365363
}
366364

367-
// Run starts the GCEControllerServer.
368-
func (gceCS *GCEControllerServer) Run() {
369-
go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop)
370-
}
371-
372-
func (gceCS *GCEControllerServer) worker() {
373-
// Runs until workqueue is shut down
374-
for gceCS.processNextWorkItem() {
375-
}
376-
}
377-
378-
func (gceCS *GCEControllerServer) processNextWorkItem() bool {
379-
item, quit := gceCS.queue.Get()
380-
if quit {
381-
return false
382-
}
383-
defer gceCS.queue.Done(item)
384-
385-
workItem, ok := item.(*workItem)
386-
if !ok {
387-
gceCS.queue.AddRateLimited(item)
388-
return true
389-
}
390-
391-
if workItem.publishReq != nil {
392-
_, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq)
393-
394-
if err != nil {
395-
klog.Errorf("ControllerPublishVolume failed with error: %v", err)
396-
}
397-
}
398-
399-
if workItem.unpublishReq != nil {
400-
_, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq)
401-
402-
if err != nil {
403-
klog.Errorf("ControllerUnpublishVolume failed with error: %v", err)
404-
}
405-
}
406-
407-
gceCS.queue.Forget(item)
408-
return true
409-
}
410-
411365
func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
412-
// Only valid requests will be queued
366+
// Only valid requests will be accepted
413367
_, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)
414-
415368
if err != nil {
416369
return nil, err
417370
}
418371

419-
// If the node is not marked, proceed the request
420-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
421-
return gceCS.executeControllerPublishVolume(ctx, req)
372+
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
373+
return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId)
422374
}
423375

424-
// Node is marked so queue up the request. Note the original gRPC context may get canceled,
425-
// so a new one is created here.
426-
//
427-
// Note that the original context probably has a timeout (see csiAttach in external-attacher),
428-
// which is ignored.
429-
gceCS.queue.AddRateLimited(&workItem{
430-
ctx: context.Background(),
431-
publishReq: req,
432-
})
433-
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
376+
resp, err := gceCS.executeControllerPublishVolume(ctx, req)
377+
if err != nil {
378+
klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
379+
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
380+
} else {
381+
klog.Infof("For node %s clear backoff due to successful publish of volume %v", req.NodeId, req.VolumeId)
382+
gceCS.nodeBackoff.Reset(req.NodeId)
383+
}
384+
return resp, err
434385
}
435386

436387
func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) {
@@ -542,39 +493,32 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
542493

543494
err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName)
544495
if err != nil {
545-
// Mark the node and rate limit all the following attach/detach
546-
// operations for this node
547-
gceCS.publishErrorsSeenOnNode[nodeID] = true
548496
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
549497
}
550-
551-
// Attach succeeds so unmark the node
552-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
553-
554498
klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID)
555499
return pubVolResp, nil
556500
}
557501

558502
func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
559503
// Only valid requests will be queued
560504
_, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
561-
562505
if err != nil {
563506
return nil, err
564507
}
565508

566-
// If the node is not marked, proceed the request
567-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
568-
return gceCS.executeControllerUnpublishVolume(ctx, req)
509+
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
510+
return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId)
569511
}
570512

571-
// Node is marked so queue up the request
572-
gceCS.queue.AddRateLimited(&workItem{
573-
ctx: context.Background(),
574-
unpublishReq: req,
575-
})
576-
577-
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
513+
resp, err := gceCS.executeControllerUnpublishVolume(ctx, req)
514+
if err != nil {
515+
klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
516+
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
517+
} else {
518+
klog.Infof("For node %s clear backoff due to successful unpublish of volume %v", req.NodeId, req.VolumeId)
519+
gceCS.nodeBackoff.Reset(req.NodeId)
520+
}
521+
return resp, err
578522
}
579523

580524
func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) {
@@ -650,15 +594,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
650594

651595
err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName)
652596
if err != nil {
653-
// Mark the node and rate limit all the following attach/detach
654-
// operations for this node
655-
gceCS.publishErrorsSeenOnNode[nodeID] = true
656597
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown detach error: %v", err))
657598
}
658599

659-
// Detach succeeds so unmark the node
660-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
661-
662600
klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID)
663601
return &csi.ControllerUnpublishVolumeResponse{}, nil
664602
}

0 commit comments

Comments
 (0)