diff --git a/pkg/gce-cloud-provider/compute/fake-gce.go b/pkg/gce-cloud-provider/compute/fake-gce.go index 876eba310..505dff475 100644 --- a/pkg/gce-cloud-provider/compute/fake-gce.go +++ b/pkg/gce-cloud-provider/compute/fake-gce.go @@ -17,6 +17,7 @@ package gcecloudprovider import ( "context" "fmt" + "net/http" "strconv" "strings" @@ -419,8 +420,9 @@ func (cloud *FakeCloudProvider) UpdateDiskStatus(s string) { } type Signal struct { - ReportError bool - ReportRunning bool + ReportError bool + ReportRunning bool + ReportTooManyRequestsError bool } type FakeBlockingCloudProvider struct { @@ -446,6 +448,9 @@ func (cloud *FakeBlockingCloudProvider) WaitForZonalOp(ctx context.Context, proj if val.ReportError { return fmt.Errorf("force mock error of zonal op %s", opName) } + if val.ReportTooManyRequestsError { + return tooManyRequestsError() + } return nil } @@ -483,6 +488,12 @@ func invalidError() *googleapi.Error { } } +func tooManyRequestsError() *googleapi.Error { + return &googleapi.Error{ + Code: http.StatusTooManyRequests, + } +} + func (cloud *FakeCloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta.Key, readWrite, diskType, project, location, instanceName string) (*computev1.Operation, error) { source := cloud.GetDiskSourceURI(project, volKey) attachedDiskV1 := &computev1.AttachedDisk{ diff --git a/pkg/gce-cloud-provider/compute/gce-compute.go b/pkg/gce-cloud-provider/compute/gce-compute.go index 1f604da32..943fee1f7 100644 --- a/pkg/gce-cloud-provider/compute/gce-compute.go +++ b/pkg/gce-cloud-provider/compute/gce-compute.go @@ -716,7 +716,7 @@ func (cloud *CloudProvider) WaitForAttach(ctx context.Context, project string, v klog.V(6).Infof("Polling for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start)) disk, err := cloud.GetDisk(ctx, project, volKey, GCEAPIVersionV1) if err != nil { - return false, fmt.Errorf("GetDisk failed to get disk: %v", err) + return false, err } if disk == nil { @@ -935,6 +935,7 @@ func (cloud *CloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta. op, err := cloud.service.Instances.AttachDisk(project, location, instanceName, attachedDiskV1).Context(ctx).Do() if err != nil { + klog.Errorf("failed to start attach op for disk %s, instance %s, err: %v", deviceName, instanceName, err) return nil, fmt.Errorf("failed cloud service attach disk call: %v", err) } return op, nil @@ -943,6 +944,7 @@ func (cloud *CloudProvider) StartAttachDiskOp(ctx context.Context, volKey *meta. func (cloud *CloudProvider) StartDetachDiskOp(ctx context.Context, project, location, deviceName, instanceName string) (*computev1.Operation, error) { op, err := cloud.service.Instances.DetachDisk(project, location, instanceName, deviceName).Context(ctx).Do() if err != nil { + klog.Errorf("failed to start detach op for disk %s, instance %s, err %v", deviceName, instanceName, err) return nil, fmt.Errorf("failed cloud service detach disk call: %v", err) } return op, nil @@ -953,7 +955,8 @@ func (cloud *CloudProvider) CheckZonalOpDoneStatus(ctx context.Context, project, lastKnownOp, err := cloud.service.ZoneOperations.Get(project, location, opId).Context(ctx).Do() if err != nil { if !IsGCENotFoundError(err) { - return false, fmt.Errorf("failed to get operation %s: %v", opId, err) + klog.Errorf("failed to check status for op %s, err: %v", opId, err) + return false, err } return true, nil } diff --git a/pkg/gce-cloud-provider/compute/gce.go b/pkg/gce-cloud-provider/compute/gce.go index c699c9132..0838c271c 100644 --- a/pkg/gce-cloud-provider/compute/gce.go +++ b/pkg/gce-cloud-provider/compute/gce.go @@ -258,8 +258,17 @@ func IsGCENotFoundError(err error) bool { return IsGCEError(err, "notFound") } -// IsInvalidError returns true if the error is a googleapi.Error with +// IsGCEInvalidError returns true if the error is a googleapi.Error with // invalid reason func IsGCEInvalidError(err error) bool { return IsGCEError(err, "invalid") } + +// IsTooManyRequestError returns true if the error is a googleapi.Error with +// resource exhausted error code. +func IsTooManyRequestError(err error) bool { + if apierr, ok := err.(*googleapi.Error); ok && apierr.Code == http.StatusTooManyRequests { + return true + } + return false +} diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 70fdbf338..e9d184d5d 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -30,14 +30,18 @@ import ( "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/workqueue" + "k8s.io/client-go/util/flowcontrol" "k8s.io/klog" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" ) +const ( + nodeBackoffInitialDuration = 200 * time.Millisecond + nodeBackoffMaxDuration = 5 * time.Minute +) + type GCEControllerServer struct { Driver *GCEDriver CloudProvider gce.GCECompute @@ -50,17 +54,9 @@ type GCEControllerServer struct { // Aborted error volumeLocks *common.VolumeLocks - // queue is a rate limited work queue for Controller Publish/Unpublish - // Volume calls - queue workqueue.RateLimitingInterface - - // publishErrorsSeenOnNode is a list of nodes with attach/detach - // operation failures so those nodes shall be rate limited for all - // the attach/detach operations until there is an attach / detach - // operation succeeds - publishErrorsSeenOnNode map[string]bool - - opsManager *OpsManager + // nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible. + nodeBackoff *flowcontrol.Backoff + opsManager *OpsManager } type workItem struct { @@ -337,46 +333,6 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del // Run starts the GCEControllerServer. func (gceCS *GCEControllerServer) Run() { go gceCS.opsManager.HydrateOpsCache() - go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop) -} - -func (gceCS *GCEControllerServer) worker() { - // Runs until workqueue is shut down - for gceCS.processNextWorkItem() { - } -} - -func (gceCS *GCEControllerServer) processNextWorkItem() bool { - item, quit := gceCS.queue.Get() - if quit { - return false - } - defer gceCS.queue.Done(item) - - workItem, ok := item.(*workItem) - if !ok { - gceCS.queue.AddRateLimited(item) - return true - } - - if workItem.publishReq != nil { - _, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq) - - if err != nil { - klog.Errorf("ControllerPublishVolume failed with error: %v", err) - } - } - - if workItem.unpublishReq != nil { - _, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq) - - if err != nil { - klog.Errorf("ControllerUnpublishVolume failed with error: %v", err) - } - } - - gceCS.queue.Forget(item) - return true } func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { @@ -384,28 +340,27 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r return nil, status.Errorf(codes.Aborted, "Cache not ready") } - // Only valid requests will be queued + // Only valid requests will be accepted _, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) - if err != nil { return nil, err } - // If the node is not marked, proceed the request - if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found { - return gceCS.executeControllerPublishVolume(ctx, req) + if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) { + return nil, status.Errorf(codes.Unavailable, "ControllerPublish not permitted on node %q due to backoff", req.NodeId) } - // Node is marked so queue up the request. Note the original gRPC context may get canceled, - // so a new one is created here. - // - // Note that the original context probably has a timeout (see csiAttach in external-attacher), - // which is ignored. - gceCS.queue.AddRateLimited(&workItem{ - ctx: context.Background(), - publishReq: req, - }) - return nil, status.Error(codes.Unavailable, "Request queued due to error condition on node") + resp, err := gceCS.executeControllerPublishVolume(ctx, req) + backoff := isResourceExhaustedError(err) + if backoff && !gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) { + klog.V(5).Infof("For node %s adding backoff due to error for volume %s", req.NodeId, req.VolumeId) + gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now()) + } else if err == nil { + klog.V(5).Infof("For node %s clear backoff due to successful publish of volume %v", req.NodeId, req.VolumeId) + gceCS.nodeBackoff.Reset(req.NodeId) + } + + return resp, err } func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) { @@ -438,7 +393,6 @@ func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx con func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) - if err != nil { return nil, err } @@ -457,6 +411,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err) } + if gce.IsTooManyRequestError(err) { + return nil, status.Errorf(codes.ResourceExhausted, "ControllerPublishVolume error repairing underspecified volume key: %v", err) + } return nil, status.Errorf(codes.Internal, "ControllerPublishVolume error repairing underspecified volume key: %v", err) } @@ -473,6 +430,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con if gce.IsGCENotFoundError(err) { return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find disk %v: %v", volKey.String(), err)) } + if gce.IsTooManyRequestError(err) { + return nil, status.Errorf(codes.ResourceExhausted, "get disk error: %v", err) + } return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown get disk error: %v", err)) } instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID) @@ -484,6 +444,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con if gce.IsGCENotFoundError(err) { return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find instance %v: %v", nodeID, err)) } + if gce.IsTooManyRequestError(err) { + return nil, status.Errorf(codes.ResourceExhausted, "get instance error: %v", err) + } return nil, status.Error(codes.Internal, fmt.Sprintf("Unknown get instance error: %v", err)) } @@ -523,20 +486,20 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con InstanceName: instanceName, }) if err != nil { + if gce.IsTooManyRequestError(err) { + return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute attach operation, error: %v", err) + } return nil, err } err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName) if err != nil { - // Mark the node and rate limit all the following attach/detach - // operations for this node - gceCS.publishErrorsSeenOnNode[nodeID] = true + if gce.IsTooManyRequestError(err) { + return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute wait for attach operation, error: %v", err) + } return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err)) } - // Attach succeeds so unmark the node - delete(gceCS.publishErrorsSeenOnNode, nodeID) - klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID) return pubVolResp, nil } @@ -546,25 +509,25 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, return nil, status.Errorf(codes.Aborted, "Cache not ready") } - // Only valid requests will be queued + // Only valid requests will be accepted. _, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) - if err != nil { return nil, err } - // If the node is not marked, proceed the request - if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found { - return gceCS.executeControllerUnpublishVolume(ctx, req) + if gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) { + return nil, status.Errorf(codes.Unavailable, "ControllerUnpublish not permitted on node %q due to backoff", req.NodeId) } - - // Node is marked so queue up the request - gceCS.queue.AddRateLimited(&workItem{ - ctx: ctx, - unpublishReq: req, - }) - - return &csi.ControllerUnpublishVolumeResponse{}, nil + resp, err := gceCS.executeControllerUnpublishVolume(ctx, req) + backoff := isResourceExhaustedError(err) + if backoff && !gceCS.nodeBackoff.IsInBackOffSinceUpdate(req.NodeId, gceCS.nodeBackoff.Clock.Now()) { + klog.V(5).Infof("For node %s adding backoff due to unpublish error for volume %s", req.NodeId, req.VolumeId) + gceCS.nodeBackoff.Next(req.NodeId, gceCS.nodeBackoff.Clock.Now()) + } else if err == nil { + klog.V(5).Infof("For node %s clear backoff due to succesful unpublish of volume %s", req.NodeId, req.VolumeId) + gceCS.nodeBackoff.Reset(req.NodeId) + } + return resp, err } func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) { @@ -588,7 +551,6 @@ func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx c func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) - if err != nil { return nil, err } @@ -600,6 +562,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "ControllerUnpublishVolume could not find volume with ID %v: %v", volumeID, err) } + if gce.IsTooManyRequestError(err) { + return nil, status.Errorf(codes.ResourceExhausted, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err) + } return nil, status.Errorf(codes.Internal, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err) } @@ -622,6 +587,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C klog.Warningf("Treating volume %v as unpublished because node %v could not be found", volKey.String(), instanceName) return &csi.ControllerUnpublishVolumeResponse{}, nil } + if gce.IsTooManyRequestError(err) { + return nil, status.Errorf(codes.ResourceExhausted, "ControllerUnpublishVolume error repairing underspecified volume key: %v", err) + } return nil, status.Error(codes.Internal, fmt.Sprintf("error getting instance: %v", err)) } @@ -645,10 +613,12 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C InstanceName: instanceName, }) if err != nil { + if gce.IsTooManyRequestError(err) { + return nil, status.Errorf(codes.ResourceExhausted, "Failed to execute detach operation, error: %v", err) + } return nil, err } - delete(gceCS.publishErrorsSeenOnNode, nodeID) klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID) return &csi.ControllerUnpublishVolumeResponse{}, nil } diff --git a/pkg/gce-pd-csi-driver/controller_test.go b/pkg/gce-pd-csi-driver/controller_test.go index 7322719dd..398598f9b 100644 --- a/pkg/gce-pd-csi-driver/controller_test.go +++ b/pkg/gce-pd-csi-driver/controller_test.go @@ -22,19 +22,21 @@ import ( "testing" "time" + "context" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/golang/protobuf/ptypes" - "context" - compute "google.golang.org/api/compute/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/retry" csi "github.com/container-storage-interface/spec/lib/go/csi" + "k8s.io/apimachinery/pkg/util/clock" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" gcecloudprovider "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" @@ -1872,188 +1874,6 @@ func TestCreateVolumeDiskReady(t *testing.T) { } } -func TestControllerPublishUnpublishVolume(t *testing.T) { - testCases := []struct { - name string - seedDisks []*gce.CloudDisk - pubReq *csi.ControllerPublishVolumeRequest - unpubReq *csi.ControllerUnpublishVolumeRequest - errorSeenOnNode bool - fakeCloudProvider bool - }{ - { - name: "queue up publish requests if node has publish error", - seedDisks: []*gce.CloudDisk{ - createZonalCloudDisk(name), - }, - pubReq: &csi.ControllerPublishVolumeRequest{ - VolumeId: testVolumeID, - NodeId: testNodeID, - VolumeCapability: &csi.VolumeCapability{ - AccessType: &csi.VolumeCapability_Mount{ - Mount: &csi.VolumeCapability_MountVolume{}, - }, - AccessMode: &csi.VolumeCapability_AccessMode{ - Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, - }, - }, - }, - errorSeenOnNode: true, - fakeCloudProvider: false, - }, - { - name: "queue up and process publish requests if node has publish error", - seedDisks: []*gce.CloudDisk{ - createZonalCloudDisk(name), - }, - pubReq: &csi.ControllerPublishVolumeRequest{ - VolumeId: testVolumeID, - NodeId: testNodeID, - VolumeCapability: &csi.VolumeCapability{ - AccessType: &csi.VolumeCapability_Mount{ - Mount: &csi.VolumeCapability_MountVolume{}, - }, - AccessMode: &csi.VolumeCapability_AccessMode{ - Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, - }, - }, - }, - errorSeenOnNode: true, - fakeCloudProvider: true, - }, - { - name: "do not queue up publish requests if node doesn't have publish error", - seedDisks: []*gce.CloudDisk{ - createZonalCloudDisk(name), - }, - pubReq: &csi.ControllerPublishVolumeRequest{ - VolumeId: testVolumeID, - NodeId: testNodeID, - VolumeCapability: &csi.VolumeCapability{ - AccessType: &csi.VolumeCapability_Mount{ - Mount: &csi.VolumeCapability_MountVolume{}, - }, - AccessMode: &csi.VolumeCapability_AccessMode{ - Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, - }, - }, - }, - errorSeenOnNode: false, - fakeCloudProvider: false, - }, - { - name: "queue up unpublish requests if node has publish error", - seedDisks: []*gce.CloudDisk{ - createZonalCloudDisk(name), - }, - unpubReq: &csi.ControllerUnpublishVolumeRequest{ - VolumeId: testVolumeID, - NodeId: testNodeID, - }, - errorSeenOnNode: true, - fakeCloudProvider: false, - }, - { - name: "queue up and process unpublish requests if node has publish error", - seedDisks: []*gce.CloudDisk{ - createZonalCloudDisk(name), - }, - unpubReq: &csi.ControllerUnpublishVolumeRequest{ - VolumeId: testVolumeID, - NodeId: testNodeID, - }, - errorSeenOnNode: true, - fakeCloudProvider: true, - }, - { - name: "do not queue up unpublish requests if node doesn't have publish error", - seedDisks: []*gce.CloudDisk{ - createZonalCloudDisk(name), - }, - unpubReq: &csi.ControllerUnpublishVolumeRequest{ - VolumeId: testVolumeID, - NodeId: testNodeID, - }, - errorSeenOnNode: false, - fakeCloudProvider: false, - }, - } - for _, tc := range testCases { - t.Logf("test case: %s", tc.name) - - var gceDriver *GCEDriver - - if tc.fakeCloudProvider { - fcp, err := gce.CreateFakeCloudProvider(project, zone, tc.seedDisks) - if err != nil { - t.Fatalf("Failed to create fake cloud provider: %v", err) - } - - instance := &compute.Instance{ - Name: node, - Disks: []*compute.AttachedDisk{}, - } - fcp.InsertInstance(instance, zone, node) - - // Setup new driver each time so no interference - gceDriver = initGCEDriverWithCloudProvider(t, fcp) - } else { - gceDriver = initGCEDriver(t, tc.seedDisks) - } - gceDriver.cs.opsManager.ready = true - // mark the node in the map - if tc.errorSeenOnNode { - gceDriver.cs.publishErrorsSeenOnNode[testNodeID] = true - } - - requestCount := 50 - for i := 0; i < requestCount; i++ { - if tc.pubReq != nil { - gceDriver.cs.ControllerPublishVolume(context.Background(), tc.pubReq) - } - - if tc.unpubReq != nil { - gceDriver.cs.ControllerUnpublishVolume(context.Background(), tc.unpubReq) - } - } - - queued := false - - if tc.errorSeenOnNode { - if err := wait.Poll(10*time.Nanosecond, 1*time.Second, func() (bool, error) { - if gceDriver.cs.queue.Len() > 0 { - queued = true - - if tc.fakeCloudProvider { - gceDriver.cs.Run() - } - } - - // Items are queued up and eventually all processed - if tc.fakeCloudProvider { - return queued && gceDriver.cs.queue.Len() == 0, nil - } - - return gceDriver.cs.queue.Len() == requestCount, nil - }); err != nil { - if tc.fakeCloudProvider { - t.Fatalf("%v requests not processed for node has seen error", gceDriver.cs.queue.Len()) - } else { - t.Fatalf("Only %v requests queued up for node has seen error", gceDriver.cs.queue.Len()) - } - } - } - - if !tc.errorSeenOnNode { - if err := wait.Poll(10*time.Nanosecond, 10*time.Millisecond, func() (bool, error) { - return gceDriver.cs.queue.Len() != 0, nil - }); err == nil { - t.Fatalf("%v requests queued up for node hasn't seen error", gceDriver.cs.queue.Len()) - } - } - } -} - func TestControllerPublishInterop(t *testing.T) { readyToExecute := make(chan chan gcecloudprovider.Signal, 1) disk1 := name + "1" @@ -3112,3 +2932,168 @@ func TestHydrateCache(t *testing.T) { }) } } + +func isUnavailableError(err error) bool { + if err == nil { + return false + } + + st, ok := status.FromError(err) + if !ok { + return false + } + + return st.Code().String() == "Unavailable" +} + +func testBackoffHelper(t *testing.T, testControllerPublish bool) { + readyToExecute := make(chan chan gcecloudprovider.Signal, 1) + disk1 := name + "1" + cloudDisks := []*gce.CloudDisk{ + createZonalCloudDisk(disk1), + } + fcp, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks) + if err != nil { + t.Fatalf("Failed to create fake cloud provider: %v", err) + } + fcpBlocking := &gce.FakeBlockingCloudProvider{ + FakeCloudProvider: fcp, + ReadyToExecute: readyToExecute, + } + instance := &compute.Instance{ + Name: node, + Disks: []*compute.AttachedDisk{}, + } + if !testControllerPublish { + instance.Disks = append(instance.Disks, &compute.AttachedDisk{DeviceName: disk1}) + } + fcp.InsertInstance(instance, zone, node) + + driver := GetGCEDriver() + tc := clock.NewFakeClock(time.Now()) + driver.cs = &GCEControllerServer{ + Driver: driver, + CloudProvider: fcpBlocking, + seen: map[string]int{}, + volumeLocks: common.NewVolumeLocks(), + nodeBackoff: flowcontrol.NewFakeBackOff(nodeBackoffInitialDuration, nodeBackoffMaxDuration, tc), + opsManager: NewOpsManager(fcpBlocking), + } + driver.cs.opsManager.ready = true + + key := testNodeID + step := 1 * time.Millisecond + // Mock an active backoff condition on the node. This will setup a backoff duration of the 'nodeBackoffInitialDuration'. + driver.cs.nodeBackoff.Next(key, tc.Now()) + pubreq := &csi.ControllerPublishVolumeRequest{ + VolumeId: testVolumeID + "1", + NodeId: testNodeID, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + } + unpubreq := &csi.ControllerUnpublishVolumeRequest{ + VolumeId: testVolumeID + "1", + NodeId: testNodeID, + } + // For the first 199 ms, the backoff condition is true. All controller publish request will be denied with 'Unavailable' error code. + for i := 0; i < 199; i++ { + tc.Step(step) + var err error + if testControllerPublish { + _, err = driver.cs.ControllerPublishVolume(context.Background(), pubreq) + } else { + _, err = driver.cs.ControllerUnpublishVolume(context.Background(), unpubreq) + } + if !isUnavailableError(err) { + t.Errorf("unexpected error %v", err) + } + } + + // Mock clock tick for the 200th millisecond. So backoff condition is no longer true. + tc.Step(step) + // Mock a ControllerPublish error due to failure to poll for attach disk op. + runPublishRequest := func(req *csi.ControllerPublishVolumeRequest) <-chan error { + response := make(chan error) + go func() { + _, err := driver.cs.ControllerPublishVolume(context.Background(), req) + response <- err + }() + return response + } + runUnpublishRequest := func(req *csi.ControllerUnpublishVolumeRequest) <-chan error { + response := make(chan error) + go func() { + _, err := driver.cs.ControllerUnpublishVolume(context.Background(), req) + response <- err + }() + return response + } + + var respPublish <-chan error + var respUnpublish <-chan error + if testControllerPublish { + respPublish = runPublishRequest(pubreq) + } else { + respUnpublish = runUnpublishRequest(unpubreq) + } + execute := <-readyToExecute + s1 := gcecloudprovider.Signal{ReportTooManyRequestsError: true} + execute <- s1 + if testControllerPublish { + if err := <-respPublish; err == nil { + t.Errorf("expected error") + } + } else { + if err := <-respUnpublish; err == nil { + t.Errorf("expected error") + } + } + + // The above failure should cause driver to call Backoff.Next() again and a backoff duration of 400 ms duration is set starting at the 200th millisecond. + // For the 200-599 ms, the backoff condition is true, and new controller publish requests will be deined. + for i := 0; i < 399; i++ { + tc.Step(step) + var err error + if testControllerPublish { + _, err = driver.cs.ControllerPublishVolume(context.Background(), pubreq) + } else { + _, err = driver.cs.ControllerUnpublishVolume(context.Background(), unpubreq) + } + + if !isUnavailableError(err) { + t.Errorf("unexpected error %v", err) + } + } + + // Mock clock tick for the 600th millisecond. So backoff condition is no longer true. + tc.Step(step) + // Now mock a successful ControllerPublish request. + if testControllerPublish { + respPublish = runPublishRequest(pubreq) + if err := <-respPublish; err != nil { + t.Errorf("unexpected error") + } + } else { + respUnpublish = runUnpublishRequest(unpubreq) + if err := <-respUnpublish; err != nil { + t.Errorf("unexpected error") + } + } + + // Driver is expected to remove the node key from the backoff map. + t1 := driver.cs.nodeBackoff.Get(key) + if t1 != 0 { + t.Error("unexpected delay") + } +} + +func TestControllerPublishUnpublishBackoff(t *testing.T) { + testBackoffHelper(t, true /* publish */) + testBackoffHelper(t, false /* unpublish */) +} diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index 67054ff3c..2537fca9f 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -20,7 +20,7 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/client-go/util/workqueue" + "k8s.io/client-go/util/flowcontrol" "k8s.io/klog" "k8s.io/mount-utils" common "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" @@ -151,13 +151,13 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute) *GCEControllerServer { return &GCEControllerServer{ - Driver: gceDriver, - CloudProvider: cloudProvider, - seen: map[string]int{}, - volumeLocks: common.NewVolumeLocks(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controllerserver"), - publishErrorsSeenOnNode: map[string]bool{}, - opsManager: NewOpsManager(cloudProvider), + Driver: gceDriver, + CloudProvider: cloudProvider, + seen: map[string]int{}, + volumeLocks: common.NewVolumeLocks(), + // flowcontrol uses an exponential backoff policy with a factor of 2 + nodeBackoff: flowcontrol.NewBackOff(nodeBackoffInitialDuration, nodeBackoffMaxDuration), + opsManager: NewOpsManager(cloudProvider), } } diff --git a/pkg/gce-pd-csi-driver/ops_manager.go b/pkg/gce-pd-csi-driver/ops_manager.go index 1e2eebcb3..617ab6021 100644 --- a/pkg/gce-pd-csi-driver/ops_manager.go +++ b/pkg/gce-pd-csi-driver/ops_manager.go @@ -83,7 +83,7 @@ func (o *OpsManager) ExecuteAttachDisk(ctx context.Context, opts *AttachDiskOpts // Lock released, wait for the op to complete. err = o.cloudProvider.WaitForZonalOp(ctx, opts.Project, op.Name, opts.Location) if err != nil { - return status.Error(codes.Internal, fmt.Sprintf("%v", err)) + return err } // Op succeeded, acquire cache lock and clear op. @@ -102,7 +102,7 @@ func (o *OpsManager) ExecuteDetachDisk(ctx context.Context, opts *DetachDiskOpts // Lock released, wait for the op to complete. err = o.cloudProvider.WaitForZonalOp(ctx, opts.Project, op.Name, opts.Location) if err != nil { - return status.Error(codes.Internal, fmt.Sprintf("%v", err)) + return err } // Op succeeded, acquire cache lock and clear op. @@ -117,13 +117,15 @@ func (o *OpsManager) checkAndUpdateLastKnownInstanceOps(ctx context.Context, pro } for _, op := range ops { - klog.V(5).Infof("Found last known op name %q (type %q) for instance %q", op.Name, op.Type, instanceName) + klog.V(5).Infof("Found last known op name %q (type %q) on node %q", op.Name, op.Type, instanceName) done, err := o.cloudProvider.CheckZonalOpDoneStatus(ctx, project, location, op.Name) if err != nil { - return status.Error(codes.Internal, fmt.Sprintf("%v", err)) + return err } if !done { + msg := fmt.Sprintf("operation %q (type %q) on node %s is still in progress", op.Name, op.Type, instanceName) + klog.V(5).Info(msg) return status.Error(codes.Aborted, fmt.Sprintf("operation %q (type %q) is still in progress", op.Name, op.Type)) } @@ -139,14 +141,16 @@ func (o *OpsManager) checkAndUpdateLastKnownDiskInstanceOp(ctx context.Context, return nil } - klog.V(5).Infof("Found last known op name %q (type %q) for instance %q", opInfo.Name, opInfo.Type, instanceName) + klog.V(5).Infof("Found last known op name %q (type %q) on node %q", opInfo.Name, opInfo.Type, instanceName) done, err := o.cloudProvider.CheckZonalOpDoneStatus(ctx, project, location, opInfo.Name) if err != nil { - return status.Error(codes.Internal, fmt.Sprintf("%v", err)) + return err } if !done { - return status.Error(codes.Aborted, fmt.Sprintf("operation %q (type %q) is still in progress", opInfo.Name, opInfo.Type)) + msg := fmt.Sprintf("operation %q (type %q) for disk %s on node %s is still in progress", opInfo.Name, opInfo.Type, deviceName, instanceName) + klog.V(5).Info(msg) + return status.Error(codes.Aborted, msg) } o.opsCache.DiskInstanceOps.ClearOp(common.CreateDiskInstanceKey(project, location, deviceName, instanceName), opInfo.Name) @@ -169,7 +173,7 @@ func (o *OpsManager) checkCacheAndStartAttachDiskOp(ctx context.Context, volKey op, err := o.cloudProvider.StartAttachDiskOp(ctx, volKey, readWrite, diskType, project, instanceZone, instanceName) if err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("%v", err)) + return nil, err } klog.V(5).Infof("AttachDisk operation %q for disk %q started on instance %q", op.Name, deviceName, instanceName) @@ -194,13 +198,14 @@ func (o *OpsManager) checkCacheAndStartDetachDiskOp(ctx context.Context, project op, err := o.cloudProvider.StartDetachDiskOp(ctx, project, instanceZone, deviceName, instanceName) if err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("%v", err)) + return nil, err } klog.V(5).Infof("DetachDisk operation %q for disk %q started on instance %q", op.Name, deviceName, instanceName) o.opsCache.DiskInstanceOps.AddOp(common.CreateDiskInstanceKey(project, instanceZone, deviceName, instanceName), common.OpInfo{Name: op.Name, Type: op.OperationType}) return op, nil } + func (o *OpsManager) HydrateOpsCache() { var ops []*compute.Operation retry.OnError(backoff, func(err error) bool { return true }, func() error { diff --git a/pkg/gce-pd-csi-driver/utils.go b/pkg/gce-pd-csi-driver/utils.go index 5478971a7..40c342539 100644 --- a/pkg/gce-pd-csi-driver/utils.go +++ b/pkg/gce-pd-csi-driver/utils.go @@ -23,7 +23,9 @@ import ( "context" csi "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + "google.golang.org/grpc/status" "k8s.io/klog" ) @@ -199,3 +201,29 @@ func collectMountOptions(fsType string, mntFlags []string) []string { } return options } + +func isInternalError(err error) bool { + if err == nil { + return false + } + + st, ok := status.FromError(err) + if !ok { + return false + } + + return st.Code().String() == "Internal" +} + +func isResourceExhaustedError(err error) bool { + if err == nil { + return false + } + + st, ok := status.FromError(err) + if !ok { + return false + } + + return st.Code().String() == "ResourceExhausted" +} diff --git a/vendor/modules.txt b/vendor/modules.txt index dbba4200c..e2cc0de5e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -18,6 +18,7 @@ github.com/beorn7/perks/quantile # github.com/blang/semver v3.5.1+incompatible ## explicit github.com/blang/semver +## explicit; go 1.13 # github.com/cespare/xxhash/v2 v2.1.1 ## explicit; go 1.11 github.com/cespare/xxhash/v2