diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 7b1434075..e0a831ec0 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -30,6 +30,8 @@ import ( "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" "k8s.io/klog" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" @@ -47,6 +49,22 @@ type GCEControllerServer struct { // operations for that same volume (as defined by Volume Key) return an // Aborted error volumeLocks *common.VolumeLocks + + // queue is a rate limited work queue for Controller Publish/Unpublish + // Volume calls + queue workqueue.RateLimitingInterface + + // publishErrorsSeenOnNode is a list of nodes with attach/detach + // operation failures so those nodes shall be rate limited for all + // the attach/detach operations until there is an attach / detach + // operation succeeds + publishErrorsSeenOnNode map[string]bool +} + +type workItem struct { + ctx context.Context + publishReq *csi.ControllerPublishVolumeRequest + unpublishReq *csi.ControllerUnpublishVolumeRequest } var _ csi.ControllerServer = &GCEControllerServer{} @@ -280,25 +298,113 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del return &csi.DeleteVolumeResponse{}, nil } +// Run starts the GCEControllerServer. +func (gceCS *GCEControllerServer) Run() { + go wait.Until(gceCS.worker, 1*time.Second, wait.NeverStop) +} + +func (gceCS *GCEControllerServer) worker() { + // Runs until workqueue is shut down + for gceCS.processNextWorkItem() { + } +} + +func (gceCS *GCEControllerServer) processNextWorkItem() bool { + item, quit := gceCS.queue.Get() + if quit { + return false + } + defer gceCS.queue.Done(item) + + workItem, ok := item.(*workItem) + if !ok { + gceCS.queue.AddRateLimited(item) + return true + } + + if workItem.publishReq != nil { + _, err := gceCS.executeControllerPublishVolume(workItem.ctx, workItem.publishReq) + + if err != nil { + klog.Errorf("ControllerPublishVolume failed with error: %v", err) + } + } + + if workItem.unpublishReq != nil { + _, err := gceCS.executeControllerUnpublishVolume(workItem.ctx, workItem.unpublishReq) + + if err != nil { + klog.Errorf("ControllerUnpublishVolume failed with error: %v", err) + } + } + + gceCS.queue.Forget(item) + return true +} + func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { + // Only valid requests will be queued + _, _, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) + + if err != nil { + return nil, err + } + + // If the node is not marked, proceed the request + if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found { + return gceCS.executeControllerPublishVolume(ctx, req) + } + + // Node is marked so queue up the request + gceCS.queue.AddRateLimited(&workItem{ + ctx: ctx, + publishReq: req, + }) + return &csi.ControllerPublishVolumeResponse{}, nil +} + +func (gceCS *GCEControllerServer) validateControllerPublishVolumeRequest(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (string, *meta.Key, error) { // Validate arguments volumeID := req.GetVolumeId() - readOnly := req.GetReadonly() nodeID := req.GetNodeId() volumeCapability := req.GetVolumeCapability() if len(volumeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided") + return "", nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided") } if len(nodeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Node ID must be provided") + return "", nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Node ID must be provided") } if volumeCapability == nil { - return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume capability must be provided") + return "", nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume capability must be provided") } project, volKey, err := common.VolumeIDToKey(volumeID) if err != nil { - return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("ControllerPublishVolume volume ID is invalid: %v", err)) + return "", nil, status.Error(codes.InvalidArgument, fmt.Sprintf("ControllerPublishVolume volume ID is invalid: %v", err)) + } + + // TODO(#253): Check volume capability matches for ALREADY_EXISTS + if err = validateVolumeCapability(volumeCapability); err != nil { + return "", nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapabilities is invalid: %v", err)) + } + + return project, volKey, nil +} + +func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { + project, volKey, err := gceCS.validateControllerPublishVolumeRequest(ctx, req) + + if err != nil { + return nil, err + } + + volumeID := req.GetVolumeId() + readOnly := req.GetReadonly() + nodeID := req.GetNodeId() + volumeCapability := req.GetVolumeCapability() + + pubVolResp := &csi.ControllerPublishVolumeResponse{ + PublishContext: nil, } project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) @@ -317,15 +423,6 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r } defer gceCS.volumeLocks.Release(lockingVolumeID) - // TODO(#253): Check volume capability matches for ALREADY_EXISTS - if err = validateVolumeCapability(volumeCapability); err != nil { - return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapabilities is invalid: %v", err)) - } - - pubVolResp := &csi.ControllerPublishVolumeResponse{ - PublishContext: nil, - } - _, err = gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) if err != nil { if gce.IsGCENotFoundError(err) { @@ -375,29 +472,69 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r err = gceCS.CloudProvider.WaitForAttach(ctx, project, volKey, instanceZone, instanceName) if err != nil { + // Mark the node and rate limit all the following attach/detach + // operations for this node + gceCS.publishErrorsSeenOnNode[nodeID] = true return nil, status.Error(codes.Internal, fmt.Sprintf("unknown WaitForAttach error: %v", err)) } + // Attach succeeds so unmark the node + delete(gceCS.publishErrorsSeenOnNode, nodeID) + klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v", volKey, nodeID) return pubVolResp, nil } func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { + // Only valid requests will be queued + _, _, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) + + if err != nil { + return nil, err + } + + // If the node is not marked, proceed the request + if _, found := gceCS.publishErrorsSeenOnNode[req.NodeId]; !found { + return gceCS.executeControllerUnpublishVolume(ctx, req) + } + + // Node is marked so queue up the request + gceCS.queue.AddRateLimited(&workItem{ + ctx: ctx, + unpublishReq: req, + }) + + return &csi.ControllerUnpublishVolumeResponse{}, nil +} + +func (gceCS *GCEControllerServer) validateControllerUnpublishVolumeRequest(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (string, *meta.Key, error) { // Validate arguments volumeID := req.GetVolumeId() nodeID := req.GetNodeId() if len(volumeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Volume ID must be provided") + return "", nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Volume ID must be provided") } if len(nodeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Node ID must be provided") + return "", nil, status.Error(codes.InvalidArgument, "ControllerUnpublishVolume Node ID must be provided") } project, volKey, err := common.VolumeIDToKey(volumeID) if err != nil { - return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("ControllerUnpublishVolume Volume ID is invalid: %v", err)) + return "", nil, status.Error(codes.InvalidArgument, fmt.Sprintf("ControllerUnpublishVolume Volume ID is invalid: %v", err)) } + return project, volKey, nil +} + +func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { + project, volKey, err := gceCS.validateControllerUnpublishVolumeRequest(ctx, req) + + if err != nil { + return nil, err + } + + volumeID := req.GetVolumeId() + nodeID := req.GetNodeId() project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { @@ -443,9 +580,15 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, err = gceCS.CloudProvider.DetachDisk(ctx, project, deviceName, instanceZone, instanceName) if err != nil { + // Mark the node and rate limit all the following attach/detach + // operations for this node + gceCS.publishErrorsSeenOnNode[nodeID] = true return nil, status.Error(codes.Internal, fmt.Sprintf("unknown detach error: %v", err)) } + // Detach succeeds so unmark the node + delete(gceCS.publishErrorsSeenOnNode, nodeID) + klog.V(4).Infof("ControllerUnpublishVolume succeeded for disk %v from node %v", volKey, nodeID) return &csi.ControllerUnpublishVolumeResponse{}, nil } diff --git a/pkg/gce-pd-csi-driver/controller_test.go b/pkg/gce-pd-csi-driver/controller_test.go index fe671ba78..eb8693db8 100644 --- a/pkg/gce-pd-csi-driver/controller_test.go +++ b/pkg/gce-pd-csi-driver/controller_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" csi "github.com/container-storage-interface/spec/lib/go/csi" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" @@ -63,6 +64,7 @@ var ( region, _ = common.GetRegionFromZones([]string{zone}) testRegionalID = fmt.Sprintf("projects/%s/regions/%s/disks/%s", project, region, name) testSnapshotID = fmt.Sprintf("projects/%s/global/snapshots/%s", project, name) + testNodeID = fmt.Sprintf("projects/%s/zones/%s/instances/%s", project, zone, node) ) func TestCreateSnapshotArguments(t *testing.T) { @@ -1716,3 +1718,185 @@ func TestCreateVolumeDiskReady(t *testing.T) { }) } } + +func TestControllerPublishUnpublishVolume(t *testing.T) { + testCases := []struct { + name string + seedDisks []*gce.CloudDisk + pubReq *csi.ControllerPublishVolumeRequest + unpubReq *csi.ControllerUnpublishVolumeRequest + errorSeenOnNode bool + fakeCloudProvider bool + }{ + { + name: "queue up publish requests if node has publish error", + seedDisks: []*gce.CloudDisk{ + createZonalCloudDisk(name), + }, + pubReq: &csi.ControllerPublishVolumeRequest{ + VolumeId: testVolumeID, + NodeId: testNodeID, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + errorSeenOnNode: true, + fakeCloudProvider: false, + }, + { + name: "queue up and process publish requests if node has publish error", + seedDisks: []*gce.CloudDisk{ + createZonalCloudDisk(name), + }, + pubReq: &csi.ControllerPublishVolumeRequest{ + VolumeId: testVolumeID, + NodeId: testNodeID, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + errorSeenOnNode: true, + fakeCloudProvider: true, + }, + { + name: "do not queue up publish requests if node doesn't have publish error", + seedDisks: []*gce.CloudDisk{ + createZonalCloudDisk(name), + }, + pubReq: &csi.ControllerPublishVolumeRequest{ + VolumeId: testVolumeID, + NodeId: testNodeID, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + errorSeenOnNode: false, + fakeCloudProvider: false, + }, + { + name: "queue up unpublish requests if node has publish error", + seedDisks: []*gce.CloudDisk{ + createZonalCloudDisk(name), + }, + unpubReq: &csi.ControllerUnpublishVolumeRequest{ + VolumeId: testVolumeID, + NodeId: testNodeID, + }, + errorSeenOnNode: true, + fakeCloudProvider: false, + }, + { + name: "queue up and process unpublish requests if node has publish error", + seedDisks: []*gce.CloudDisk{ + createZonalCloudDisk(name), + }, + unpubReq: &csi.ControllerUnpublishVolumeRequest{ + VolumeId: testVolumeID, + NodeId: testNodeID, + }, + errorSeenOnNode: true, + fakeCloudProvider: true, + }, + { + name: "do not queue up unpublish requests if node doesn't have publish error", + seedDisks: []*gce.CloudDisk{ + createZonalCloudDisk(name), + }, + unpubReq: &csi.ControllerUnpublishVolumeRequest{ + VolumeId: testVolumeID, + NodeId: testNodeID, + }, + errorSeenOnNode: false, + fakeCloudProvider: false, + }, + } + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + + var gceDriver *GCEDriver + + if tc.fakeCloudProvider { + fcp, err := gce.CreateFakeCloudProvider(project, zone, tc.seedDisks) + if err != nil { + t.Fatalf("Failed to create fake cloud provider: %v", err) + } + + instance := &compute.Instance{ + Name: node, + Disks: []*compute.AttachedDisk{}, + } + fcp.InsertInstance(instance, zone, node) + + // Setup new driver each time so no interference + gceDriver = initGCEDriverWithCloudProvider(t, fcp) + } else { + gceDriver = initGCEDriver(t, tc.seedDisks) + } + + // mark the node in the map + if tc.errorSeenOnNode { + gceDriver.cs.publishErrorsSeenOnNode[testNodeID] = true + } + + requestCount := 50 + for i := 0; i < requestCount; i++ { + if tc.pubReq != nil { + gceDriver.cs.ControllerPublishVolume(context.Background(), tc.pubReq) + } + + if tc.unpubReq != nil { + gceDriver.cs.ControllerUnpublishVolume(context.Background(), tc.unpubReq) + } + } + + queued := false + + if tc.errorSeenOnNode { + if err := wait.Poll(10*time.Nanosecond, 1*time.Second, func() (bool, error) { + if gceDriver.cs.queue.Len() > 0 { + queued = true + + if tc.fakeCloudProvider { + gceDriver.cs.Run() + } + } + + // Items are queued up and eventually all processed + if tc.fakeCloudProvider { + return queued && gceDriver.cs.queue.Len() == 0, nil + } + + return gceDriver.cs.queue.Len() == requestCount, nil + }); err != nil { + if tc.fakeCloudProvider { + t.Fatalf("%v requests not processed for node has seen error", gceDriver.cs.queue.Len()) + } else { + t.Fatalf("Only %v requests queued up for node has seen error", gceDriver.cs.queue.Len()) + } + } + } + + if !tc.errorSeenOnNode { + if err := wait.Poll(10*time.Nanosecond, 10*time.Millisecond, func() (bool, error) { + return gceDriver.cs.queue.Len() != 0, nil + }); err == nil { + t.Fatalf("%v requests queued up for node hasn't seen error", gceDriver.cs.queue.Len()) + } + } + } +} diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index 62189861a..e4f48f652 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -20,6 +20,7 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "k8s.io/client-go/util/workqueue" "k8s.io/klog" "k8s.io/mount-utils" common "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" @@ -149,16 +150,22 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute) *GCEControllerServer { return &GCEControllerServer{ - Driver: gceDriver, - CloudProvider: cloudProvider, - seen: map[string]int{}, - volumeLocks: common.NewVolumeLocks(), + Driver: gceDriver, + CloudProvider: cloudProvider, + seen: map[string]int{}, + volumeLocks: common.NewVolumeLocks(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controllerserver"), + publishErrorsSeenOnNode: map[string]bool{}, } } func (gceDriver *GCEDriver) Run(endpoint string) { klog.V(4).Infof("Driver: %v", gceDriver.name) + if gceDriver.cs != nil { + gceDriver.cs.Run() + } + //Start the nonblocking GRPC s := NewNonBlockingGRPCServer() // TODO(#34): Only start specific servers based on a flag. @@ -166,5 +173,6 @@ func (gceDriver *GCEDriver) Run(endpoint string) { // The schema for that was in util. basically it was just s.start but with some nil servers. s.Start(endpoint, gceDriver.ids, gceDriver.cs, gceDriver.ns) + s.Wait() }