Skip to content

Commit 919c8c0

Browse files
committed
1. use flowcontrol backoff lib for publish/unpublish backoff
2. backoff on resource exhausted error
1 parent 11e2389 commit 919c8c0

File tree

9 files changed

+307
-295
lines changed

9 files changed

+307
-295
lines changed

pkg/gce-cloud-provider/compute/fake-gce.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package gcecloudprovider
1717
import (
1818
"context"
1919
"fmt"
20+
"net/http"
2021
"strconv"
2122
"strings"
2223

@@ -419,8 +420,9 @@ func (cloud *FakeCloudProvider) UpdateDiskStatus(s string) {
419420
}
420421

421422
type Signal struct {
422-
ReportError bool
423-
ReportRunning bool
423+
ReportError bool
424+
ReportRunning bool
425+
ReportTooManyRequestsError bool
424426
}
425427

426428
type FakeBlockingCloudProvider struct {
@@ -446,6 +448,9 @@ func (cloud *FakeBlockingCloudProvider) WaitForZonalOp(ctx context.Context, proj
446448
if val.ReportError {
447449
return fmt.Errorf("force mock error of zonal op %s", opName)
448450
}
451+
if val.ReportTooManyRequestsError {
452+
return tooManyRequestsError()
453+
}
449454
return nil
450455
}
451456

@@ -483,6 +488,12 @@ func invalidError() *googleapi.Error {
483488
}
484489
}
485490

491+
func tooManyRequestsError() *googleapi.Error {
492+
return &googleapi.Error{
493+
Code: http.StatusTooManyRequests,
494+
}
495+
}
496+
486497
func (cloud *FakeCloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta.Key, readWrite, diskType, project, location, instanceName string) (*computev1.Operation, error) {
487498
source := cloud.GetDiskSourceURI(project, volKey)
488499
attachedDiskV1 := &computev1.AttachedDisk{

pkg/gce-cloud-provider/compute/gce-compute.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ func (cloud *CloudProvider) WaitForAttach(ctx context.Context, project string, v
716716
klog.V(6).Infof("Polling for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
717717
disk, err := cloud.GetDisk(ctx, project, volKey, GCEAPIVersionV1)
718718
if err != nil {
719-
return false, fmt.Errorf("GetDisk failed to get disk: %v", err)
719+
return false, err
720720
}
721721

722722
if disk == nil {
@@ -935,6 +935,7 @@ func (cloud *CloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta.
935935

936936
op, err := cloud.service.Instances.AttachDisk(project, location, instanceName, attachedDiskV1).Context(ctx).Do()
937937
if err != nil {
938+
klog.Errorf("failed to start attach op for disk %s, instance %s, err: %v", deviceName, instanceName, err)
938939
return nil, fmt.Errorf("failed cloud service attach disk call: %v", err)
939940
}
940941
return op, nil
@@ -943,6 +944,7 @@ func (cloud *CloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta.
943944
func (cloud *CloudProvider) StartDetachDiskOp(ctx context.Context, project, location, deviceName, instanceName string) (*computev1.Operation, error) {
944945
op, err := cloud.service.Instances.DetachDisk(project, location, instanceName, deviceName).Context(ctx).Do()
945946
if err != nil {
947+
klog.Errorf("failed to start detach op for disk %s, instance %s, err %v", deviceName, instanceName, err)
946948
return nil, fmt.Errorf("failed cloud service detach disk call: %v", err)
947949
}
948950
return op, nil
@@ -953,7 +955,8 @@ func (cloud *CloudProvider) CheckZonalOpDoneStatus(ctx context.Context, project,
953955
lastKnownOp, err := cloud.service.ZoneOperations.Get(project, location, opId).Context(ctx).Do()
954956
if err != nil {
955957
if !IsGCENotFoundError(err) {
956-
return false, fmt.Errorf("failed to get operation %s: %v", opId, err)
958+
klog.Errorf("failed to check status for op %s, err: %v", opId, err)
959+
return false, err
957960
}
958961
return true, nil
959962
}

pkg/gce-cloud-provider/compute/gce.go

+9
Original file line numberDiff line numberDiff line change
@@ -263,3 +263,12 @@ func IsGCENotFoundError(err error) bool {
263263
func IsGCEInvalidError(err error) bool {
264264
return IsGCEError(err, "invalid")
265265
}
266+
267+
// IsGCENotFoundError returns true if the error is a googleapi.Error with
268+
// resource exhausted error code.
269+
func IsTooManyRequestError(err error) bool {
270+
if apierr, ok := err.(*googleapi.Error); ok && apierr.Code == http.StatusTooManyRequests {
271+
return true
272+
}
273+
return false
274+
}

pkg/gce-pd-csi-driver/controller.go

+60-90
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,18 @@ import (
3030
"google.golang.org/grpc/status"
3131
"k8s.io/apimachinery/pkg/util/sets"
3232
"k8s.io/apimachinery/pkg/util/uuid"
33-
"k8s.io/apimachinery/pkg/util/wait"
34-
"k8s.io/client-go/util/workqueue"
33+
"k8s.io/client-go/util/flowcontrol"
3534
"k8s.io/klog"
3635

3736
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3837
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
3938
)
4039

40+
const (
41+
nodeBackoffInitialDuration = 200 * time.Millisecond
42+
nodeBackoffMaxDuration = 5 * time.Minute
43+
)
44+
4145
type GCEControllerServer struct {
4246
Driver *GCEDriver
4347
CloudProvider gce.GCECompute
@@ -50,17 +54,9 @@ type GCEControllerServer struct {
5054
// Aborted error
5155
volumeLocks *common.VolumeLocks
5256

53-
// queue is a rate limited work queue for Controller Publish/Unpublish
54-
// Volume calls
55-
queue workqueue.RateLimitingInterface
56-
57-
// publishErrorsSeenOnNode is a list of nodes with attach/detach
58-
// operation failures so those nodes shall be rate limited for all
59-
// the attach/detach operations until there is an attach / detach
60-
// operation succeeds
61-
publishErrorsSeenOnNode map[string]bool
62-
63-
opsManager *OpsManager
57+
// nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible.
58+
nodeBackoff *flowcontrol.Backoff
59+
opsManager *OpsManager
6460
}
6561

6662
type workItem struct {
@@ -337,75 +333,34 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
337333
// Run starts the GCEControllerServer.
338334
func (gceCS *GCEControllerServer) Run() {
339335
go gceCS.opsManager.HydrateOpsCache()
340-
go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop)
341-
}
342-
343-
func (gceCS *GCEControllerServer) worker() {
344-
// Runs until workqueue is shut down
345-
for gceCS.processNextWorkItem() {
346-
}
347-
}
348-
349-
func (gceCS *GCEControllerServer) processNextWorkItem() bool {
350-
item, quit := gceCS.queue.Get()
351-
if quit {
352-
return false
353-
}
354-
defer gceCS.queue.Done(item)
355-
356-
workItem, ok := item.(*workItem)
357-
if !ok {
358-
gceCS.queue.AddRateLimited(item)
359-
return true
360-
}
361-
362-
if workItem.publishReq != nil {
363-
_, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq)
364-
365-
if err != nil {
366-
klog.Errorf("ControllerPublishVolume failed with error: %v", err)
367-
}
368-
}
369-
370-
if workItem.unpublishReq != nil {
371-
_, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq)
372-
373-
if err != nil {
374-
klog.Errorf("ControllerUnpublishVolume failed with error: %v", err)
375-
}
376-
}
377-
378-
gceCS.queue.Forget(item)
379-
return true
380336
}
381337

382338
func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
383339
if !gceCS.opsManager.IsReady() {
384340
return nil, status.Errorf(codes.Aborted, "Cache not ready")
385341
}
386342

387-
// Only valid requests will be queued
343+
// Only valid requests will be accepted
388344
_, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)
389-
390345
if err != nil {
391346
return nil, err
392347
}
393348

394-
// If the node is not marked, proceed the request
395-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
396-
return gceCS.executeControllerPublishVolume(ctx, req)
349+
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
350+
return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff", req.NodeId)
397351
}
398352

399-
// Node is marked so queue up the request. Note the original gRPC context may get canceled,
400-
// so a new one is created here.
401-
//
402-
// Note that the original context probably has a timeout (see csiAttach in external-attacher),
403-
// which is ignored.
404-
gceCS.queue.AddRateLimited(&workItem{
405-
ctx: context.Background(),
406-
publishReq: req,
407-
})
408-
return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node")
353+
resp, err := gceCS.executeControllerPublishVolume(ctx, req)
354+
backoff := isResourceExhaustedError(err)
355+
if backoff && !gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
356+
klog.V(5).Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId)
357+
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
358+
} else if err == nil {
359+
klog.V(5).Infof("For node %s clear backoff due to successful publish of volume %v", req.NodeId, req.VolumeId)
360+
gceCS.nodeBackoff.Reset(req.NodeId)
361+
}
362+
363+
return resp, err
409364
}
410365

411366
func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) {
@@ -438,7 +393,6 @@ func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx con
438393

439394
func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
440395
project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req)
441-
442396
if err != nil {
443397
return nil, err
444398
}
@@ -457,6 +411,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
457411
if gce.IsGCENotFoundError(err) {
458412
return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err)
459413
}
414+
if gce.IsTooManyRequestError(err) {
415+
return nil, status.Errorf(codes.ResourceExhausted, "ControllerPublishVolume error repairing underspecified volume key: %v", err)
416+
}
460417
return nil, status.Errorf(codes.Internal, "ControllerPublishVolume error repairing underspecified volume key: %v", err)
461418
}
462419

@@ -473,6 +430,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
473430
if gce.IsGCENotFoundError(err) {
474431
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find disk %v: %v", volKey.String(), err))
475432
}
433+
if gce.IsTooManyRequestError(err) {
434+
return nil, status.Errorf(codes.ResourceExhausted, "get disk error: %v", err)
435+
}
476436
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown get disk error: %v", err))
477437
}
478438
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
@@ -484,6 +444,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
484444
if gce.IsGCENotFoundError(err) {
485445
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find instance %v: %v", nodeID, err))
486446
}
447+
if gce.IsTooManyRequestError(err) {
448+
return nil, status.Errorf(codes.ResourceExhausted, "get instance error: %v", err)
449+
}
487450
return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown get instance error: %v", err))
488451
}
489452

@@ -523,20 +486,20 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
523486
InstanceName: instanceName,
524487
})
525488
if err != nil {
489+
if gce.IsTooManyRequestError(err) {
490+
return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute attach operation, error: %v", err)
491+
}
526492
return nil, err
527493
}
528494

529495
err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName)
530496
if err != nil {
531-
// Mark the node and rate limit all the following attach/detach
532-
// operations for this node
533-
gceCS.publishErrorsSeenOnNode[nodeID] = true
497+
if gce.IsTooManyRequestError(err) {
498+
return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute wait for attach operation, error: %v", err)
499+
}
534500
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err))
535501
}
536502

537-
// Attach succeeds so unmark the node
538-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
539-
540503
klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID)
541504
return pubVolResp, nil
542505
}
@@ -546,25 +509,25 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
546509
return nil, status.Errorf(codes.Aborted, "Cache not ready")
547510
}
548511

549-
// Only valid requests will be queued
512+
// Only valid requests will be accepted.
550513
_, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
551-
552514
if err != nil {
553515
return nil, err
554516
}
555517

556-
// If the node is not marked, proceed the request
557-
if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found {
558-
return gceCS.executeControllerUnpublishVolume(ctx, req)
518+
if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
519+
return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff", req.NodeId)
559520
}
560-
561-
// Node is marked so queue up the request
562-
gceCS.queue.AddRateLimited(&workItem{
563-
ctx: ctx,
564-
unpublishReq: req,
565-
})
566-
567-
return &csi.ControllerUnpublishVolumeResponse{}, nil
521+
resp, err := gceCS.executeControllerUnpublishVolume(ctx, req)
522+
backoff := isResourceExhaustedError(err)
523+
if backoff && !gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) {
524+
klog.V(5).Infof("For node %s adding backoff due to unpublish error for volume %s", req.NodeId, req.VolumeId)
525+
gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now())
526+
} else if err == nil {
527+
klog.V(5).Infof("For node %s clear backoff due to succesful unpublish of volume %s", req.NodeId, req.VolumeId)
528+
gceCS.nodeBackoff.Reset(req.NodeId)
529+
}
530+
return resp, err
568531
}
569532

570533
func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) {
@@ -588,7 +551,6 @@ func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx c
588551

589552
func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
590553
project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
591-
592554
if err != nil {
593555
return nil, err
594556
}
@@ -600,6 +562,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
600562
if gce.IsGCENotFoundError(err) {
601563
return nil, status.Errorf(codes.NotFound, "ControllerUnpublishVolume could not find volume with ID %v: %v", volumeID, err)
602564
}
565+
if gce.IsTooManyRequestError(err) {
566+
return nil, status.Errorf(codes.ResourceExhausted, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err)
567+
}
603568
return nil, status.Errorf(codes.Internal, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err)
604569
}
605570

@@ -622,6 +587,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
622587
klog.Warningf("Treating volume %v as unpublished because node %v could not be found", volKey.String(), instanceName)
623588
return &csi.ControllerUnpublishVolumeResponse{}, nil
624589
}
590+
if gce.IsTooManyRequestError(err) {
591+
return nil, status.Errorf(codes.ResourceExhausted, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err)
592+
}
625593
return nil, status.Error(codes.Internal, fmt.Sprintf("error getting instance: %v", err))
626594
}
627595

@@ -645,10 +613,12 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
645613
InstanceName: instanceName,
646614
})
647615
if err != nil {
616+
if gce.IsTooManyRequestError(err) {
617+
return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute detach operation, error: %v", err)
618+
}
648619
return nil, err
649620
}
650621

651-
delete(gceCS.publishErrorsSeenOnNode, nodeID)
652622
klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID)
653623
return &csi.ControllerUnpublishVolumeResponse{}, nil
654624
}

0 commit comments

Comments
 (0)