Skip to content

Commit 662d6d5

Browse files
Merge pull request kubernetes-sigs#34 from jsafrane/4.11-throttling
OCPBUGS-11000: UPSTREAM: 988: Simplify node backoff logic
2 parents f9d7fdc + e04a8e7 commit 662d6d5

File tree

6 files changed

+396
-272
lines changed

6 files changed

+396
-272
lines changed

pkg/gce-cloud-provider/compute/fake-gce.go

+28-3
Original file line numberDiff line numberDiff line change
@@ -504,27 +504,47 @@ func (cloud *FakeCloudProvider) UpdateDiskStatus(s string) {
504504

505505
type FakeBlockingCloudProvider struct {
506506
*FakeCloudProvider
507-
ReadyToExecute chan chan struct{}
507+
ReadyToExecute chan chan Signal
508508
}
509509

510510
// FakeBlockingCloudProvider's method adds functionality to finely control the order of execution of CreateSnapshot calls.
511511
// Upon starting a CreateSnapshot, it passes a chan 'executeCreateSnapshot' into readyToExecute, then blocks on executeCreateSnapshot.
512512
// The test calling this function can block on readyToExecute to ensure that the operation has started and
513513
// allowed the CreateSnapshot to continue by passing a struct into executeCreateSnapshot.
514514
func (cloud *FakeBlockingCloudProvider) CreateSnapshot(ctx context.Context, project string, volKey *meta.Key, snapshotName string, snapshotParams common.SnapshotParameters) (*computev1.Snapshot, error) {
515-
executeCreateSnapshot := make(chan struct{})
515+
executeCreateSnapshot := make(chan Signal)
516516
cloud.ReadyToExecute <- executeCreateSnapshot
517517
<-executeCreateSnapshot
518518
return cloud.FakeCloudProvider.CreateSnapshot(ctx, project, volKey, snapshotName, snapshotParams)
519519
}
520520

521521
func (cloud *FakeBlockingCloudProvider) CreateImage(ctx context.Context, project string, volKey *meta.Key, imageName string, snapshotParams common.SnapshotParameters) (*computev1.Image, error) {
522-
executeCreateSnapshot := make(chan struct{})
522+
executeCreateSnapshot := make(chan Signal)
523523
cloud.ReadyToExecute <- executeCreateSnapshot
524524
<-executeCreateSnapshot
525525
return cloud.FakeCloudProvider.CreateImage(ctx, project, volKey, imageName, snapshotParams)
526526
}
527527

528+
func (cloud *FakeBlockingCloudProvider) DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error {
529+
execute := make(chan Signal)
530+
cloud.ReadyToExecute <- execute
531+
val := <-execute
532+
if val.ReportError {
533+
return fmt.Errorf("force mock error for DetachDisk device %s", deviceName)
534+
}
535+
return cloud.FakeCloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName)
536+
}
537+
538+
func (cloud *FakeBlockingCloudProvider) AttachDisk(ctx context.Context, project string, volKey *meta.Key, readWrite, diskType, instanceZone, instanceName string) error {
539+
execute := make(chan Signal)
540+
cloud.ReadyToExecute <- execute
541+
val := <-execute
542+
if val.ReportError {
543+
return fmt.Errorf("force mock error for AttachDisk: volkey %s", volKey)
544+
}
545+
return cloud.FakeCloudProvider.AttachDisk(ctx, project, volKey, readWrite, diskType, instanceZone, instanceName)
546+
}
547+
528548
func notFoundError() *googleapi.Error {
529549
return &googleapi.Error{
530550
Errors: []googleapi.ErrorItem{
@@ -544,3 +564,8 @@ func invalidError() *googleapi.Error {
544564
},
545565
}
546566
}
567+
568+
type Signal struct {
569+
ReportError bool
570+
ReportRunning bool
571+
}

pkg/gce-pd-csi-driver/controller.go

+89-93
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ import (
3131
"google.golang.org/grpc/status"
3232
"k8s.io/apimachinery/pkg/util/sets"
3333
"k8s.io/apimachinery/pkg/util/uuid"
34-
"k8s.io/apimachinery/pkg/util/wait"
35-
"k8s.io/client-go/util/workqueue"
34+
"k8s.io/client-go/util/flowcontrol"
3635
"k8s.io/klog"
3736

3837
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3938
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
4039
)
4140

41+
const (
42+
errorBackoffInitialDuration = 200 * time.Millisecond
43+
errorBackoffMaxDuration = 5 * time.Minute
44+
)
45+
4246
type GCEControllerServer struct {
4347
Driver *GCEDriver
4448
CloudProvider gce.GCECompute
@@ -54,16 +58,45 @@ type GCEControllerServer struct {
5458
// Aborted error
5559
volumeLocks *common.VolumeLocks
5660

57-
// queue is a rate limited work queue for Controller Publish/Unpublish
58-
// Volume calls
59-
queue workqueue.RateLimitingInterface
61+
// There are several kinds of errors that are immediately retried by either
62+
// the CSI sidecars or the k8s control plane. The retries consume GCP api
63+
// quota, eg by doing ListVolumes, and so backoff needs to be used to
64+
// prevent quota exhaustion.
65+
//
66+
// Examples of these errors are the per-instance GCE operation queue getting
67+
// full (typically only 32 operations in flight at a time are allowed), and
68+
// disks being deleted out from under a PV causing unpublish errors.
69+
//
70+
// While we need to backoff, we also need some semblance of fairness. In
71+
// particular, volume unpublish retries happen very quickly, and with
72+
// a single backoff per node these retries can prevent any other operation
73+
// from making progess, even if it would succeed. Hence we track errors on
74+
// node and disk pairs, backing off only for calls matching such a
75+
// pair.
76+
//
77+
// An implication is that in the full operation queue situation, requests
78+
// for new disks will not backoff the first time. This is acceptible as a
79+
// single spurious call will not cause problems for quota exhaustion or make
80+
// the operation queue problem worse. This is well compensated by giving
81+
// disks where no problems are ocurring a chance to be processed.
82+
//
83+
// errorBackoff keeps track of any active backoff condition on a given node,
84+
// and the time when retry of controller publish/unpublish is permissible. A
85+
// node and disk pair is marked with backoff when any error is encountered
86+
// by the driver during controller publish/unpublish calls. If the
87+
// controller eventually allows controller publish/publish requests for
88+
// volumes (because the backoff time expired), and those requests fail, the
89+
// next backoff retry time will be updated on every failure and capped at
90+
// 'errorBackoffMaxDuration'. Also, any successful controller
91+
// publish/unpublish call will clear the backoff condition for a node and
92+
// disk.
93+
errorBackoff *csiErrorBackoff
94+
}
6095

61-
// publishErrorsSeenOnNode is a list of nodes with attach/detach
62-
// operation failures so those nodes shall be rate limited for all
63-
// the attach/detach operations until there is an attach / detach
64-
// operation succeeds
65-
publishErrorsSeenOnNode map[string]bool
96+
type csiErrorBackoff struct {
97+
backoff *flowcontrol.Backoff
6698
}
99+
type csiErrorBackoffId string
67100

68101
type workItem struct {
69102
ctx context.Context
@@ -336,73 +369,27 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
336369
return &csi.DeleteVolumeResponse{}, nil
337370
}
338371

339-
// Run starts the GCEControllerServer.
340-
func (gceCS *GCEControllerServer) Run() {
341-
go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop)
342-
}
343-
344-
func (gceCS *GCEControllerServer) worker() {
345-
// Runs until workqueue is shut down
346-
for gceCS.processNextWorkItem() {
347-
}
348-
}
349-
350-
func (gceCS *GCEControllerServer) processNextWorkItem() bool {
351-
item, quit := gceCS.queue.Get()
352-
if quit {
353-
return false
354-
}
355-
defer gceCS.queue.Done(item)
356-
357-
workItem, ok := item.(*workItem)
358-
if !ok {
359-
gceCS.queue.AddRateLimited(item)
360-
return true
361-
}
362-
363-
if workItem.publishReq != nil {
364-
_, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq)
365-
366-
if err != nil {
367-
klog.Errorf("ControllerPublishVolume failed with error: %v", err)
368-
}
369-
}
370-
371-
if workItem.unpublishReq != nil {
372-
_, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq)
373-
374-
if err != nil {
375-
klog.Errorf("ControllerUnpublishVolume failed with error: %v", err)
376-
}
377-
}
378-
379-
gceCS.queue.Forget(item)
380-
return true
381-
}
382-
383372
func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
384-
// Only valid requests will be queued
373+
// Only valid requests will be accepted
385374
_, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)
386-
387375
if err != nil {
388376
return nil, err
389377
}
390378

391-
// If the node is not marked, proceed the request
392-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
393-
return gceCS.executeControllerPublishVolume(ctx, req)
379+
backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId)
380+
if gceCS.errorBackoff.blocking(backoffId) {
381+
return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff condition", req.NodeId)
394382
}
395383

396-
// Node is marked so queue up the request. Note the original gRPC context may get canceled,
397-
// so a new one is created here.
398-
//
399-
// Note that the original context probably has a timeout (see csiAttach in external-attacher),
400-
// which is ignored.
401-
gceCS.queue.AddRateLimited(&workItem{
402-
ctx: context.Background(),
403-
publishReq: req,
404-
})
405-
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
384+
resp, err := gceCS.executeControllerPublishVolume(ctx, req)
385+
if err != nil {
386+
klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err)
387+
gceCS.errorBackoff.next(backoffId)
388+
} else {
389+
klog.Infof("For node %s clear backoff due to successful publish of volume %v", req.NodeId, req.VolumeId)
390+
gceCS.errorBackoff.reset(backoffId)
391+
}
392+
return resp, err
406393
}
407394

408395
func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) {
@@ -514,39 +501,33 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
514501

515502
err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName)
516503
if err != nil {
517-
// Mark the node and rate limit all the following attach/detach
518-
// operations for this node
519-
gceCS.publishErrorsSeenOnNode[nodeID] = true
520504
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
521505
}
522-
523-
// Attach succeeds so unmark the node
524-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
525-
526506
klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID)
527507
return pubVolResp, nil
528508
}
529509

530510
func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
531511
// Only valid requests will be queued
532512
_, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
533-
534513
if err != nil {
535514
return nil, err
536515
}
537516

538-
// If the node is not marked, proceed the request
539-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
540-
return gceCS.executeControllerUnpublishVolume(ctx, req)
517+
backoffId := gceCS.errorBackoff.backoffId(req.NodeId, req.VolumeId)
518+
if gceCS.errorBackoff.blocking(backoffId) {
519+
return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId)
541520
}
542521

543-
// Node is marked so queue up the request
544-
gceCS.queue.AddRateLimited(&workItem{
545-
ctx: context.Background(),
546-
unpublishReq: req,
547-
})
548-
549-
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
522+
resp, err := gceCS.executeControllerUnpublishVolume(ctx, req)
523+
if err != nil {
524+
klog.Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
525+
gceCS.errorBackoff.next(backoffId)
526+
} else {
527+
klog.Infof("For node %s clear backoff due to successful unpublish of volume %v", req.NodeId, req.VolumeId)
528+
gceCS.errorBackoff.reset(backoffId)
529+
}
530+
return resp, err
550531
}
551532

552533
func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) {
@@ -622,15 +603,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
622603

623604
err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName)
624605
if err != nil {
625-
// Mark the node and rate limit all the following attach/detach
626-
// operations for this node
627-
gceCS.publishErrorsSeenOnNode[nodeID] = true
628606
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown detach error: %v", err))
629607
}
630608

631-
// Detach succeeds so unmark the node
632-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
633-
634609
klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID)
635610
return &csi.ControllerUnpublishVolumeResponse{}, nil
636611
}
@@ -1587,3 +1562,24 @@ func pickRandAndConsecutive(slice []string, n int) ([]string, error) {
15871562
}
15881563
return ret, nil
15891564
}
1565+
1566+
func newCsiErrorBackoff() *csiErrorBackoff {
1567+
return &csiErrorBackoff{flowcontrol.NewBackOff(errorBackoffInitialDuration, errorBackoffMaxDuration)}
1568+
}
1569+
1570+
func (_ *csiErrorBackoff) backoffId(nodeId, volumeId string) csiErrorBackoffId {
1571+
return csiErrorBackoffId(fmt.Sprintf("%s:%s", nodeId, volumeId))
1572+
}
1573+
1574+
func (b *csiErrorBackoff) blocking(id csiErrorBackoffId) bool {
1575+
blk := b.backoff.IsInBackOffSinceUpdate(string(id), b.backoff.Clock.Now())
1576+
return blk
1577+
}
1578+
1579+
func (b *csiErrorBackoff) next(id csiErrorBackoffId) {
1580+
b.backoff.Next(string(id), b.backoff.Clock.Now())
1581+
}
1582+
1583+
func (b *csiErrorBackoff) reset(id csiErrorBackoffId) {
1584+
b.backoff.Reset(string(id))
1585+
}

0 commit comments

Comments
 (0)