Skip to content

Commit 63587ff

Browse files
committed
UPSTREAM: 988: Simplify node backoff logic
1 parent f9d7fdc commit 63587ff

File tree

6 files changed

+320
-274
lines changed

6 files changed

+320
-274
lines changed

pkg/gce-cloud-provider/compute/fake-gce.go

+28-3
Original file line numberDiff line numberDiff line change
@@ -504,27 +504,47 @@ func (cloud *FakeCloudProvider) UpdateDiskStatus(s string) {
504504

505505
type FakeBlockingCloudProvider struct {
506506
*FakeCloudProvider
507-
ReadyToExecute chan chan struct{}
507+
ReadyToExecute chan chan Signal
508508
}
509509

510510
// FakeBlockingCloudProvider's method adds functionality to finely control the order of execution of CreateSnapshot calls.
511511
// Upon starting a CreateSnapshot, it passes a chan 'executeCreateSnapshot' into readyToExecute, then blocks on executeCreateSnapshot.
512512
// The test calling this function can block on readyToExecute to ensure that the operation has started and
513513
// allowed the CreateSnapshot to continue by passing a struct into executeCreateSnapshot.
514514
func (cloud *FakeBlockingCloudProvider) CreateSnapshot(ctx context.Context, project string, volKey *meta.Key, snapshotName string, snapshotParams common.SnapshotParameters) (*computev1.Snapshot, error) {
515-
executeCreateSnapshot := make(chan struct{})
515+
executeCreateSnapshot := make(chan Signal)
516516
cloud.ReadyToExecute <- executeCreateSnapshot
517517
<-executeCreateSnapshot
518518
return cloud.FakeCloudProvider.CreateSnapshot(ctx, project, volKey, snapshotName, snapshotParams)
519519
}
520520

521521
func (cloud *FakeBlockingCloudProvider) CreateImage(ctx context.Context, project string, volKey *meta.Key, imageName string, snapshotParams common.SnapshotParameters) (*computev1.Image, error) {
522-
executeCreateSnapshot := make(chan struct{})
522+
executeCreateSnapshot := make(chan Signal)
523523
cloud.ReadyToExecute <- executeCreateSnapshot
524524
<-executeCreateSnapshot
525525
return cloud.FakeCloudProvider.CreateImage(ctx, project, volKey, imageName, snapshotParams)
526526
}
527527

528+
func (cloud *FakeBlockingCloudProvider) DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error {
529+
execute := make(chan Signal)
530+
cloud.ReadyToExecute <- execute
531+
val := <-execute
532+
if val.ReportError {
533+
return fmt.Errorf("force mock error for DetachDisk device %s", deviceName)
534+
}
535+
return cloud.FakeCloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName)
536+
}
537+
538+
func (cloud *FakeBlockingCloudProvider) AttachDisk(ctx context.Context, project string, volKey *meta.Key, readWrite, diskType, instanceZone, instanceName string) error {
539+
execute := make(chan Signal)
540+
cloud.ReadyToExecute <- execute
541+
val := <-execute
542+
if val.ReportError {
543+
return fmt.Errorf("force mock error for AttachDisk: volkey %s", volKey)
544+
}
545+
return cloud.FakeCloudProvider.AttachDisk(ctx, project, volKey, readWrite, diskType, instanceZone, instanceName)
546+
}
547+
528548
func notFoundError() *googleapi.Error {
529549
return &googleapi.Error{
530550
Errors: []googleapi.ErrorItem{
@@ -544,3 +564,8 @@ func invalidError() *googleapi.Error {
544564
},
545565
}
546566
}
567+
568+
type Signal struct {
569+
ReportError bool
570+
ReportRunning bool
571+
}

pkg/gce-pd-csi-driver/controller.go

+32-94
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ import (
3131
"google.golang.org/grpc/status"
3232
"k8s.io/apimachinery/pkg/util/sets"
3333
"k8s.io/apimachinery/pkg/util/uuid"
34-
"k8s.io/apimachinery/pkg/util/wait"
35-
"k8s.io/client-go/util/workqueue"
34+
"k8s.io/client-go/util/flowcontrol"
3635
"k8s.io/klog"
3736

3837
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3938
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
4039
)
4140

41+
const (
42+
nodeBackoffInitialDuration = 200 * time.Millisecond
43+
nodeBackoffMaxDuration = 5 * time.Minute
44+
)
45+
4246
type GCEControllerServer struct {
4347
Driver *GCEDriver
4448
CloudProvider gce.GCECompute
@@ -54,15 +58,9 @@ type GCEControllerServer struct {
5458
// Aborted error
5559
volumeLocks *common.VolumeLocks
5660

57-
// queue is a rate limited work queue for Controller Publish/Unpublish
58-
// Volume calls
59-
queue workqueue.RateLimitingInterface
60-
61-
// publishErrorsSeenOnNode is a list of nodes with attach/detach
62-
// operation failures so those nodes shall be rate limited for all
63-
// the attach/detach operations until there is an attach / detach
64-
// operation succeeds
65-
publishErrorsSeenOnNode map[string]bool
61+
// When the attacher sidecar issues controller publish/unpublish for multiple disks for a given node, the per-instance operation queue in GCE fills up causing attach/detach disk requests to immediately return with an error until the queue drains. nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible. A node is marked with backoff when any error is encountered by the driver during controller publish/unpublish calls.
62+
// If the controller eventually allows controller publish/publish requests for volumes (because the backoff time expired), and those requests fail, the next backoff retry time will be updated on every failure and capped at 'nodeBackoffMaxDuration'. Also, any successful controller publish/unpublish call will clear the backoff condition for the node.
63+
nodeBackoff *flowcontrol.Backoff
6664
}
6765

6866
type workItem struct {
@@ -336,73 +334,26 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
336334
return &csi.DeleteVolumeResponse{}, nil
337335
}
338336

339-
// Run starts the GCEControllerServer.
340-
func (gceCS *GCEControllerServer) Run() {
341-
go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop)
342-
}
343-
344-
func (gceCS *GCEControllerServer) worker() {
345-
// Runs until workqueue is shut down
346-
for gceCS.processNextWorkItem() {
347-
}
348-
}
349-
350-
func (gceCS *GCEControllerServer) processNextWorkItem() bool {
351-
item, quit := gceCS.queue.Get()
352-
if quit {
353-
return false
354-
}
355-
defer gceCS.queue.Done(item)
356-
357-
workItem, ok := item.(*workItem)
358-
if !ok {
359-
gceCS.queue.AddRateLimited(item)
360-
return true
361-
}
362-
363-
if workItem.publishReq != nil {
364-
_, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq)
365-
366-
if err != nil {
367-
klog.Errorf("ControllerPublishVolume failed with error: %v", err)
368-
}
369-
}
370-
371-
if workItem.unpublishReq != nil {
372-
_, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq)
373-
374-
if err != nil {
375-
klog.Errorf("ControllerUnpublishVolume failed with error: %v", err)
376-
}
377-
}
378-
379-
gceCS.queue.Forget(item)
380-
return true
381-
}
382-
383337
func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
384-
// Only valid requests will be queued
338+
// Only valid requests will be accepted
385339
_, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)
386-
387340
if err != nil {
388341
return nil, err
389342
}
390343

391-
// If the node is not marked, proceed the request
392-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
393-
return gceCS.executeControllerPublishVolume(ctx, req)
344+
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
345+
return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId)
394346
}
395347

396-
// Node is marked so queue up the request. Note the original gRPC context may get canceled,
397-
// so a new one is created here.
398-
//
399-
// Note that the original context probably has a timeout (see csiAttach in external-attacher),
400-
// which is ignored.
401-
gceCS.queue.AddRateLimited(&workItem{
402-
ctx: context.Background(),
403-
publishReq: req,
404-
})
405-
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
348+
resp, err := gceCS.executeControllerPublishVolume(ctx, req)
349+
if err != nil {
350+
klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
351+
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
352+
} else {
353+
klog.Infof("For node %s clear backoff due to successful publish of volume %v", req.NodeId, req.VolumeId)
354+
gceCS.nodeBackoff.Reset(req.NodeId)
355+
}
356+
return resp, err
406357
}
407358

408359
func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) {
@@ -514,39 +465,32 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
514465

515466
err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName)
516467
if err != nil {
517-
// Mark the node and rate limit all the following attach/detach
518-
// operations for this node
519-
gceCS.publishErrorsSeenOnNode[nodeID] = true
520468
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
521469
}
522-
523-
// Attach succeeds so unmark the node
524-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
525-
526470
klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID)
527471
return pubVolResp, nil
528472
}
529473

530474
func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
531475
// Only valid requests will be queued
532476
_, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
533-
534477
if err != nil {
535478
return nil, err
536479
}
537480

538-
// If the node is not marked, proceed the request
539-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
540-
return gceCS.executeControllerUnpublishVolume(ctx, req)
481+
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
482+
return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId)
541483
}
542484

543-
// Node is marked so queue up the request
544-
gceCS.queue.AddRateLimited(&workItem{
545-
ctx: context.Background(),
546-
unpublishReq: req,
547-
})
548-
549-
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
485+
resp, err := gceCS.executeControllerUnpublishVolume(ctx, req)
486+
if err != nil {
487+
klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
488+
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
489+
} else {
490+
klog.Infof("For node %s clear backoff due to successful unpublish of volume %v", req.NodeId, req.VolumeId)
491+
gceCS.nodeBackoff.Reset(req.NodeId)
492+
}
493+
return resp, err
550494
}
551495

552496
func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) {
@@ -622,15 +566,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
622566

623567
err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName)
624568
if err != nil {
625-
// Mark the node and rate limit all the following attach/detach
626-
// operations for this node
627-
gceCS.publishErrorsSeenOnNode[nodeID] = true
628569
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown detach error: %v", err))
629570
}
630571

631-
// Detach succeeds so unmark the node
632-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
633-
634572
klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID)
635573
return &csi.ControllerUnpublishVolumeResponse{}, nil
636574
}

0 commit comments

Comments
 (0)