From 3540af4927e56d198e6598615d15d66e6a9d58ba Mon Sep 17 00:00:00 2001 From: "Hantao (Will) Wang" Date: Fri, 14 Jun 2019 17:35:11 -0700 Subject: [PATCH] paralleize node operations by returning an error if the volume is in use --- pkg/gce-pd-csi-driver/node.go | 36 ++++++++--- pkg/gce-pd-csi-driver/node_test.go | 88 ++++++++++++++++++++++++++ pkg/mount-manager/fake-safe-mounter.go | 88 +++++++++++++++++++------- 3 files changed, 179 insertions(+), 33 deletions(-) diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index db218e06a..025c176e1 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -36,8 +36,10 @@ type GCENodeServer struct { Mounter *mount.SafeFormatAndMount DeviceUtils mountmanager.DeviceUtils MetadataService metadataservice.MetadataService - // TODO: Only lock mutually exclusive calls and make locking more fine grained - mux sync.Mutex + + // A map storing all volumes with ongoing operations so that additional operations + // for that same volume (as defined by VolumeID) return an Aborted error + volumes sync.Map } var _ csi.NodeServer = &GCENodeServer{} @@ -51,8 +53,6 @@ const ( ) func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { - ns.mux.Lock() - defer ns.mux.Unlock() klog.V(4).Infof("NodePublishVolume called with req: %#v", req) // Validate Arguments @@ -74,6 +74,11 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided") } + if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists { + return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID)) + } + defer ns.volumes.Delete(volumeID) + if err := validateVolumeCapability(volumeCapability); err != nil { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err)) } @@ -180,9 +185,8 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub } func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { - ns.mux.Lock() - defer ns.mux.Unlock() klog.V(4).Infof("NodeUnpublishVolume called with args: %v", req) + // Validate Arguments targetPath := req.GetTargetPath() volID := req.GetVolumeId() @@ -193,6 +197,11 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided") } + if _, alreadyExists := ns.volumes.LoadOrStore(volID, true); alreadyExists { + return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volID)) + } + defer ns.volumes.Delete(volID) + err := mount.CleanupMountPoint(targetPath, ns.Mounter.Interface, false /* bind mount */) if err != nil { return nil, status.Error(codes.Internal, fmt.Sprintf("Unmount failed: %v\nUnmounting arguments: %s\n", err, targetPath)) @@ -202,8 +211,6 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU } func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - ns.mux.Lock() - defer ns.mux.Unlock() klog.V(4).Infof("NodeStageVolume called with req: %#v", req) // Validate Arguments @@ -220,6 +227,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided") } + if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists { + return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID)) + } + defer ns.volumes.Delete(volumeID) + if err := validateVolumeCapability(volumeCapability); err != nil { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err)) } @@ -297,9 +309,8 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage } func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - ns.mux.Lock() - defer ns.mux.Unlock() klog.V(4).Infof("NodeUnstageVolume called with req: %#v", req) + // Validate arguments volumeID := req.GetVolumeId() stagingTargetPath := req.GetStagingTargetPath() @@ -310,6 +321,11 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume Staging Target Path must be provided") } + if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists { + return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID)) + } + defer ns.volumes.Delete(volumeID) + err := mount.CleanupMountPoint(stagingTargetPath, ns.Mounter.Interface, false /* bind mount */) if err != nil { return nil, status.Error(codes.Internal, fmt.Sprintf("NodeUnstageVolume failed to unmount at path %s: %v", stagingTargetPath, err)) diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index d52092d72..6ff01f73e 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -38,6 +38,15 @@ func getTestGCEDriver(t *testing.T) *GCEDriver { return gceDriver } +func getTestBlockingGCEDriver(t *testing.T, mountToRun chan mountmanager.MountSourceAndTarget, readyToMount chan struct{}) *GCEDriver { + gceDriver := GetGCEDriver() + err := gceDriver.SetupGCEDriver(nil, mountmanager.NewFakeSafeBlockingMounter(mountToRun, readyToMount), mountmanager.NewFakeDeviceUtils(), metadataservice.NewFakeService(), driver, "test-vendor") + if err != nil { + t.Fatalf("Failed to setup GCE Driver: %v", err) + } + return gceDriver +} + func TestNodeGetVolumeLimits(t *testing.T) { gceDriver := getTestGCEDriver(t) @@ -378,3 +387,82 @@ func TestNodeGetCapabilities(t *testing.T) { t.Fatalf("Unexpedted error: %v", err) } } + +func TestConcurrentNodeOperations(t *testing.T) { + mountToRun := make(chan mountmanager.MountSourceAndTarget, 3) + readyToMount := make(chan struct{}, 2) + reqFinished := make(chan error, 2) + + gceDriver := getTestBlockingGCEDriver(t, mountToRun, readyToMount) + ns := gceDriver.ns + vol1PublishTargetAReq := &csi.NodePublishVolumeRequest{ + VolumeId: defaultVolumeID + "1", + TargetPath: defaultTargetPath + "a", + StagingTargetPath: defaultStagingPath + "1", + Readonly: false, + VolumeCapability: stdVolCap, + } + vol1PublishTargetBReq := &csi.NodePublishVolumeRequest{ + VolumeId: defaultVolumeID + "1", + TargetPath: defaultTargetPath + "b", + StagingTargetPath: defaultStagingPath + "1", + Readonly: false, + VolumeCapability: stdVolCap, + } + vol2PublishTargetCReq := &csi.NodePublishVolumeRequest{ + VolumeId: defaultVolumeID + "2", + TargetPath: defaultTargetPath + "c", + StagingTargetPath: defaultStagingPath + "2", + Readonly: false, + VolumeCapability: stdVolCap, + } + + runRequestInBackground := func(req *csi.NodePublishVolumeRequest) { + _, err := ns.NodePublishVolume(context.Background(), req) + reqFinished <- err + } + + // Start first valid request vol1PublishTargetAReq and block until it reaches the Mount + go runRequestInBackground(vol1PublishTargetAReq) + <-readyToMount + + // Check that vol1PublishTargetBReq is rejected, due to same volume ID + // Also allow vol1PublishTargetBReq to complete, in case it is allowed to Mount + mountToRun <- mountmanager.MountSourceAndTarget{ + Source: vol1PublishTargetBReq.StagingTargetPath, + Target: vol1PublishTargetBReq.TargetPath, + } + _, err := ns.NodePublishVolume(context.Background(), vol1PublishTargetBReq) + if err != nil { + serverError, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from err: %v", err) + } + if serverError.Code() != codes.Aborted { + t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err) + } + } else { + t.Fatalf("Expected error: %v, got no error", codes.Aborted) + } + + // Start second valid request vol2PublishTargetCReq + go runRequestInBackground(vol2PublishTargetCReq) + + // Allow the vol2PublishTargetCReq to complete, which it can concurrently with vol1PublishTargetAReq + mountToRun <- mountmanager.MountSourceAndTarget{ + Source: vol2PublishTargetCReq.StagingTargetPath, + Target: vol2PublishTargetCReq.TargetPath, + } + if err = <-reqFinished; err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // To clean up, allow the vol1PublishTargetAReq to complete + mountToRun <- mountmanager.MountSourceAndTarget{ + Source: vol1PublishTargetAReq.StagingTargetPath, + Target: vol1PublishTargetAReq.TargetPath, + } + if err = <-reqFinished; err != nil { + t.Fatalf("Unexpected error: %v", err) + } +} diff --git a/pkg/mount-manager/fake-safe-mounter.go b/pkg/mount-manager/fake-safe-mounter.go index b75ea6eab..9216cd5e3 100644 --- a/pkg/mount-manager/fake-safe-mounter.go +++ b/pkg/mount-manager/fake-safe-mounter.go @@ -16,32 +16,74 @@ package mountmanager import "k8s.io/kubernetes/pkg/util/mount" -func NewFakeSafeMounter() *mount.SafeFormatAndMount { - execCallback := func(cmd string, args ...string) ([]byte, error) { - return nil, nil - // TODO(#48): Fill out exec callback for errors - /* - if len(test.execScripts) <= execCallCount { - t.Errorf("Unexpected command: %s %v", cmd, args) - return nil, nil - } - script := test.execScripts[execCallCount] - execCallCount++ - if script.command != cmd { - t.Errorf("Unexpected command %s. Expecting %s", cmd, script.command) - } - for j := range args { - if args[j] != script.args[j] { - t.Errorf("Unexpected args %v. Expecting %v", args, script.args) - } +var ( + fakeMounter = &mount.FakeMounter{MountPoints: []mount.MountPoint{}, Log: []mount.FakeAction{}} + fakeExec = mount.NewFakeExec(execCallback) +) + +func execCallback(cmd string, args ...string) ([]byte, error) { + return nil, nil + // TODO(#48): Fill out exec callback for errors + /* + if len(test.execScripts) <= execCallCount { + t.Errorf("Unexpected command: %s %v", cmd, args) + return nil, nil + } + script := test.execScripts[execCallCount] + execCallCount++ + if script.command != cmd { + t.Errorf("Unexpected command %s. Expecting %s", cmd, script.command) + } + for j := range args { + if args[j] != script.args[j] { + t.Errorf("Unexpected args %v. Expecting %v", args, script.args) } - return []byte(script.output), script.err - */ - } - fakeMounter := &mount.FakeMounter{MountPoints: []mount.MountPoint{}, Log: []mount.FakeAction{}} - fakeExec := mount.NewFakeExec(execCallback) + } + return []byte(script.output), script.err + */ +} + +func NewFakeSafeMounter() *mount.SafeFormatAndMount { return &mount.SafeFormatAndMount{ Interface: fakeMounter, Exec: fakeExec, } } + +type FakeBlockingMounter struct { + *mount.FakeMounter + mountToRun chan MountSourceAndTarget + readyToMount chan struct{} +} + +type MountSourceAndTarget struct { + Source string + Target string +} + +// FakeBlockingMounter's method adds two channels to the Mount process in order to provide functionality to finely +// control the order of execution of Mount calls. readToMount signals that a Mount operation has been called. +// Then it cycles through the mountToRun channel, waiting for permission to actually make the mount operation. +func (mounter *FakeBlockingMounter) Mount(source string, target string, fstype string, options []string) error { + mounter.readyToMount <- struct{}{} + for mountToRun := range mounter.mountToRun { + if mountToRun.Source == source && mountToRun.Target == target { + break + } else { + mounter.mountToRun <- mountToRun + } + } + return mounter.FakeMounter.Mount(source, target, fstype, options) +} + +func NewFakeSafeBlockingMounter(mountToRun chan MountSourceAndTarget, readyToMount chan struct{}) *mount.SafeFormatAndMount { + fakeBlockingMounter := &FakeBlockingMounter{ + FakeMounter: fakeMounter, + mountToRun: mountToRun, + readyToMount: readyToMount, + } + return &mount.SafeFormatAndMount{ + Interface: fakeBlockingMounter, + Exec: fakeExec, + } +}