Skip to content

return an error if the volume ID already in use #303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ type GCENodeServer struct {
Mounter *mount.SafeFormatAndMount
DeviceUtils mountmanager.DeviceUtils
MetadataService metadataservice.MetadataService
// TODO: Only lock mutually exclusive calls and make locking more fine grained
mux sync.Mutex

// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
volumes sync.Map
}

var _ csi.NodeServer = &GCENodeServer{}
Expand All @@ -51,8 +53,6 @@ const (
)

func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
ns.mux.Lock()
defer ns.mux.Unlock()
klog.V(4).Infof("NodePublishVolume called with req: %#v", req)

// Validate Arguments
Expand All @@ -74,6 +74,11 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
}

if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
}
defer ns.volumes.Delete(volumeID)

if err := validateVolumeCapability(volumeCapability); err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err))
}
Expand Down Expand Up @@ -180,9 +185,8 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
}

func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
ns.mux.Lock()
defer ns.mux.Unlock()
klog.V(4).Infof("NodeUnpublishVolume called with args: %v", req)

// Validate Arguments
targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
Expand All @@ -193,6 +197,11 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
}

if _, alreadyExists := ns.volumes.LoadOrStore(volID, true); alreadyExists {
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volID))
}
defer ns.volumes.Delete(volID)

err := mount.CleanupMountPoint(targetPath, ns.Mounter.Interface, false /* bind mount */)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Unmount failed: %v\nUnmounting arguments: %s\n", err, targetPath))
Expand All @@ -202,8 +211,6 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
}

func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
ns.mux.Lock()
defer ns.mux.Unlock()
klog.V(4).Infof("NodeStageVolume called with req: %#v", req)

// Validate Arguments
Expand All @@ -220,6 +227,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
}

if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
}
defer ns.volumes.Delete(volumeID)

if err := validateVolumeCapability(volumeCapability); err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err))
}
Expand Down Expand Up @@ -297,9 +309,8 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
}

func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
ns.mux.Lock()
defer ns.mux.Unlock()
klog.V(4).Infof("NodeUnstageVolume called with req: %#v", req)

// Validate arguments
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
Expand All @@ -310,6 +321,11 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume Staging Target Path must be provided")
}

if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
}
defer ns.volumes.Delete(volumeID)

err := mount.CleanupMountPoint(stagingTargetPath, ns.Mounter.Interface, false /* bind mount */)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("NodeUnstageVolume failed to unmount at path %s: %v", stagingTargetPath, err))
Expand Down
88 changes: 88 additions & 0 deletions pkg/gce-pd-csi-driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ func getTestGCEDriver(t *testing.T) *GCEDriver {
return gceDriver
}

func getTestBlockingGCEDriver(t *testing.T, mountToRun chan mountmanager.MountSourceAndTarget, readyToMount chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
err := gceDriver.SetupGCEDriver(nil, mountmanager.NewFakeSafeBlockingMounter(mountToRun, readyToMount), mountmanager.NewFakeDeviceUtils(), metadataservice.NewFakeService(), driver, "test-vendor")
if err != nil {
t.Fatalf("Failed to setup GCE Driver: %v", err)
}
return gceDriver
}

func TestNodeGetVolumeLimits(t *testing.T) {

gceDriver := getTestGCEDriver(t)
Expand Down Expand Up @@ -378,3 +387,82 @@ func TestNodeGetCapabilities(t *testing.T) {
t.Fatalf("Unexpedted error: %v", err)
}
}

func TestConcurrentNodeOperations(t *testing.T) {
mountToRun := make(chan mountmanager.MountSourceAndTarget, 3)
readyToMount := make(chan struct{}, 2)
reqFinished := make(chan error, 2)

gceDriver := getTestBlockingGCEDriver(t, mountToRun, readyToMount)
ns := gceDriver.ns
vol1PublishTargetAReq := &csi.NodePublishVolumeRequest{
VolumeId: defaultVolumeID + "1",
TargetPath: defaultTargetPath + "a",
StagingTargetPath: defaultStagingPath + "1",
Readonly: false,
VolumeCapability: stdVolCap,
}
vol1PublishTargetBReq := &csi.NodePublishVolumeRequest{
VolumeId: defaultVolumeID + "1",
TargetPath: defaultTargetPath + "b",
StagingTargetPath: defaultStagingPath + "1",
Readonly: false,
VolumeCapability: stdVolCap,
}
vol2PublishTargetCReq := &csi.NodePublishVolumeRequest{
VolumeId: defaultVolumeID + "2",
TargetPath: defaultTargetPath + "c",
StagingTargetPath: defaultStagingPath + "2",
Readonly: false,
VolumeCapability: stdVolCap,
}

runRequestInBackground := func(req *csi.NodePublishVolumeRequest) {
_, err := ns.NodePublishVolume(context.Background(), req)
reqFinished <- err
}

// Start first valid request vol1PublishTargetAReq and block until it reaches the Mount
go runRequestInBackground(vol1PublishTargetAReq)
<-readyToMount

// Check that vol1PublishTargetBReq is rejected, due to same volume ID
// Also allow vol1PublishTargetBReq to complete, in case it is allowed to Mount
mountToRun <- mountmanager.MountSourceAndTarget{
Source: vol1PublishTargetBReq.StagingTargetPath,
Target: vol1PublishTargetBReq.TargetPath,
}
_, err := ns.NodePublishVolume(context.Background(), vol1PublishTargetBReq)
if err != nil {
serverError, ok := status.FromError(err)
if !ok {
t.Fatalf("Could not get error status code from err: %v", err)
}
if serverError.Code() != codes.Aborted {
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
}
} else {
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
}

// Start second valid request vol2PublishTargetCReq
go runRequestInBackground(vol2PublishTargetCReq)

// Allow the vol2PublishTargetCReq to complete, which it can concurrently with vol1PublishTargetAReq
mountToRun <- mountmanager.MountSourceAndTarget{
Source: vol2PublishTargetCReq.StagingTargetPath,
Target: vol2PublishTargetCReq.TargetPath,
}
if err = <-reqFinished; err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// To clean up, allow the vol1PublishTargetAReq to complete
mountToRun <- mountmanager.MountSourceAndTarget{
Source: vol1PublishTargetAReq.StagingTargetPath,
Target: vol1PublishTargetAReq.TargetPath,
}
if err = <-reqFinished; err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
88 changes: 65 additions & 23 deletions pkg/mount-manager/fake-safe-mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,74 @@ package mountmanager

import "k8s.io/kubernetes/pkg/util/mount"

func NewFakeSafeMounter() *mount.SafeFormatAndMount {
execCallback := func(cmd string, args ...string) ([]byte, error) {
return nil, nil
// TODO(#48): Fill out exec callback for errors
/*
if len(test.execScripts) <= execCallCount {
t.Errorf("Unexpected command: %s %v", cmd, args)
return nil, nil
}
script := test.execScripts[execCallCount]
execCallCount++
if script.command != cmd {
t.Errorf("Unexpected command %s. Expecting %s", cmd, script.command)
}
for j := range args {
if args[j] != script.args[j] {
t.Errorf("Unexpected args %v. Expecting %v", args, script.args)
}
var (
fakeMounter = &mount.FakeMounter{MountPoints: []mount.MountPoint{}, Log: []mount.FakeAction{}}
fakeExec = mount.NewFakeExec(execCallback)
)

func execCallback(cmd string, args ...string) ([]byte, error) {
return nil, nil
// TODO(#48): Fill out exec callback for errors
/*
if len(test.execScripts) <= execCallCount {
t.Errorf("Unexpected command: %s %v", cmd, args)
return nil, nil
}
script := test.execScripts[execCallCount]
execCallCount++
if script.command != cmd {
t.Errorf("Unexpected command %s. Expecting %s", cmd, script.command)
}
for j := range args {
if args[j] != script.args[j] {
t.Errorf("Unexpected args %v. Expecting %v", args, script.args)
}
return []byte(script.output), script.err
*/
}
fakeMounter := &mount.FakeMounter{MountPoints: []mount.MountPoint{}, Log: []mount.FakeAction{}}
fakeExec := mount.NewFakeExec(execCallback)
}
return []byte(script.output), script.err
*/
}

func NewFakeSafeMounter() *mount.SafeFormatAndMount {
return &mount.SafeFormatAndMount{
Interface: fakeMounter,
Exec: fakeExec,
}
}

type FakeBlockingMounter struct {
*mount.FakeMounter
mountToRun chan MountSourceAndTarget
readyToMount chan struct{}
}

type MountSourceAndTarget struct {
Source string
Target string
}

// FakeBlockingMounter's method adds two channels to the Mount process in order to provide functionality to finely
// control the order of execution of Mount calls. readToMount signals that a Mount operation has been called.
// Then it cycles through the mountToRun channel, waiting for permission to actually make the mount operation.
func (mounter *FakeBlockingMounter) Mount(source string, target string, fstype string, options []string) error {
mounter.readyToMount <- struct{}{}
for mountToRun := range mounter.mountToRun {
if mountToRun.Source == source && mountToRun.Target == target {
break
} else {
mounter.mountToRun <- mountToRun
}
}
return mounter.FakeMounter.Mount(source, target, fstype, options)
}

func NewFakeSafeBlockingMounter(mountToRun chan MountSourceAndTarget, readyToMount chan struct{}) *mount.SafeFormatAndMount {
fakeBlockingMounter := &FakeBlockingMounter{
FakeMounter: fakeMounter,
mountToRun: mountToRun,
readyToMount: readyToMount,
}
return &mount.SafeFormatAndMount{
Interface: fakeBlockingMounter,
Exec: fakeExec,
}
}