Skip to content

Commit 3540af4

Browse files
author
Hantao (Will) Wang
committed
paralleize node operations by returning an error if the volume is in use
1 parent dfdc0fb commit 3540af4

File tree

3 files changed

+179
-33
lines changed

3 files changed

+179
-33
lines changed

pkg/gce-pd-csi-driver/node.go

+26-10
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ type GCENodeServer struct {
3636
Mounter *mount.SafeFormatAndMount
3737
DeviceUtils mountmanager.DeviceUtils
3838
MetadataService metadataservice.MetadataService
39-
// TODO: Only lock mutually exclusive calls and make locking more fine grained
40-
mux sync.Mutex
39+
40+
// A map storing all volumes with ongoing operations so that additional operations
41+
// for that same volume (as defined by VolumeID) return an Aborted error
42+
volumes sync.Map
4143
}
4244

4345
var _ csi.NodeServer = &GCENodeServer{}
@@ -51,8 +53,6 @@ const (
5153
)
5254

5355
func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
54-
ns.mux.Lock()
55-
defer ns.mux.Unlock()
5656
klog.V(4).Infof("NodePublishVolume called with req: %#v", req)
5757

5858
// Validate Arguments
@@ -74,6 +74,11 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
7474
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
7575
}
7676

77+
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
78+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
79+
}
80+
defer ns.volumes.Delete(volumeID)
81+
7782
if err := validateVolumeCapability(volumeCapability); err != nil {
7883
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err))
7984
}
@@ -180,9 +185,8 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
180185
}
181186

182187
func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
183-
ns.mux.Lock()
184-
defer ns.mux.Unlock()
185188
klog.V(4).Infof("NodeUnpublishVolume called with args: %v", req)
189+
186190
// Validate Arguments
187191
targetPath := req.GetTargetPath()
188192
volID := req.GetVolumeId()
@@ -193,6 +197,11 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
193197
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
194198
}
195199

200+
if _, alreadyExists := ns.volumes.LoadOrStore(volID, true); alreadyExists {
201+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volID))
202+
}
203+
defer ns.volumes.Delete(volID)
204+
196205
err := mount.CleanupMountPoint(targetPath, ns.Mounter.Interface, false /* bind mount */)
197206
if err != nil {
198207
return nil, status.Error(codes.Internal, fmt.Sprintf("Unmount failed: %v\nUnmounting arguments: %s\n", err, targetPath))
@@ -202,8 +211,6 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
202211
}
203212

204213
func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
205-
ns.mux.Lock()
206-
defer ns.mux.Unlock()
207214
klog.V(4).Infof("NodeStageVolume called with req: %#v", req)
208215

209216
// Validate Arguments
@@ -220,6 +227,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
220227
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
221228
}
222229

230+
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
231+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
232+
}
233+
defer ns.volumes.Delete(volumeID)
234+
223235
if err := validateVolumeCapability(volumeCapability); err != nil {
224236
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err))
225237
}
@@ -297,9 +309,8 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
297309
}
298310

299311
func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
300-
ns.mux.Lock()
301-
defer ns.mux.Unlock()
302312
klog.V(4).Infof("NodeUnstageVolume called with req: %#v", req)
313+
303314
// Validate arguments
304315
volumeID := req.GetVolumeId()
305316
stagingTargetPath := req.GetStagingTargetPath()
@@ -310,6 +321,11 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
310321
return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume Staging Target Path must be provided")
311322
}
312323

324+
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
325+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
326+
}
327+
defer ns.volumes.Delete(volumeID)
328+
313329
err := mount.CleanupMountPoint(stagingTargetPath, ns.Mounter.Interface, false /* bind mount */)
314330
if err != nil {
315331
return nil, status.Error(codes.Internal, fmt.Sprintf("NodeUnstageVolume failed to unmount at path %s: %v", stagingTargetPath, err))

pkg/gce-pd-csi-driver/node_test.go

+88
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ func getTestGCEDriver(t *testing.T) *GCEDriver {
3838
return gceDriver
3939
}
4040

41+
func getTestBlockingGCEDriver(t *testing.T, mountToRun chan mountmanager.MountSourceAndTarget, readyToMount chan struct{}) *GCEDriver {
42+
gceDriver := GetGCEDriver()
43+
err := gceDriver.SetupGCEDriver(nil, mountmanager.NewFakeSafeBlockingMounter(mountToRun, readyToMount), mountmanager.NewFakeDeviceUtils(), metadataservice.NewFakeService(), driver, "test-vendor")
44+
if err != nil {
45+
t.Fatalf("Failed to setup GCE Driver: %v", err)
46+
}
47+
return gceDriver
48+
}
49+
4150
func TestNodeGetVolumeLimits(t *testing.T) {
4251

4352
gceDriver := getTestGCEDriver(t)
@@ -378,3 +387,82 @@ func TestNodeGetCapabilities(t *testing.T) {
378387
t.Fatalf("Unexpedted error: %v", err)
379388
}
380389
}
390+
391+
func TestConcurrentNodeOperations(t *testing.T) {
392+
mountToRun := make(chan mountmanager.MountSourceAndTarget, 3)
393+
readyToMount := make(chan struct{}, 2)
394+
reqFinished := make(chan error, 2)
395+
396+
gceDriver := getTestBlockingGCEDriver(t, mountToRun, readyToMount)
397+
ns := gceDriver.ns
398+
vol1PublishTargetAReq := &csi.NodePublishVolumeRequest{
399+
VolumeId: defaultVolumeID + "1",
400+
TargetPath: defaultTargetPath + "a",
401+
StagingTargetPath: defaultStagingPath + "1",
402+
Readonly: false,
403+
VolumeCapability: stdVolCap,
404+
}
405+
vol1PublishTargetBReq := &csi.NodePublishVolumeRequest{
406+
VolumeId: defaultVolumeID + "1",
407+
TargetPath: defaultTargetPath + "b",
408+
StagingTargetPath: defaultStagingPath + "1",
409+
Readonly: false,
410+
VolumeCapability: stdVolCap,
411+
}
412+
vol2PublishTargetCReq := &csi.NodePublishVolumeRequest{
413+
VolumeId: defaultVolumeID + "2",
414+
TargetPath: defaultTargetPath + "c",
415+
StagingTargetPath: defaultStagingPath + "2",
416+
Readonly: false,
417+
VolumeCapability: stdVolCap,
418+
}
419+
420+
runRequestInBackground := func(req *csi.NodePublishVolumeRequest) {
421+
_, err := ns.NodePublishVolume(context.Background(), req)
422+
reqFinished <- err
423+
}
424+
425+
// Start first valid request vol1PublishTargetAReq and block until it reaches the Mount
426+
go runRequestInBackground(vol1PublishTargetAReq)
427+
<-readyToMount
428+
429+
// Check that vol1PublishTargetBReq is rejected, due to same volume ID
430+
// Also allow vol1PublishTargetBReq to complete, in case it is allowed to Mount
431+
mountToRun <- mountmanager.MountSourceAndTarget{
432+
Source: vol1PublishTargetBReq.StagingTargetPath,
433+
Target: vol1PublishTargetBReq.TargetPath,
434+
}
435+
_, err := ns.NodePublishVolume(context.Background(), vol1PublishTargetBReq)
436+
if err != nil {
437+
serverError, ok := status.FromError(err)
438+
if !ok {
439+
t.Fatalf("Could not get error status code from err: %v", err)
440+
}
441+
if serverError.Code() != codes.Aborted {
442+
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
443+
}
444+
} else {
445+
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
446+
}
447+
448+
// Start second valid request vol2PublishTargetCReq
449+
go runRequestInBackground(vol2PublishTargetCReq)
450+
451+
// Allow the vol2PublishTargetCReq to complete, which it can concurrently with vol1PublishTargetAReq
452+
mountToRun <- mountmanager.MountSourceAndTarget{
453+
Source: vol2PublishTargetCReq.StagingTargetPath,
454+
Target: vol2PublishTargetCReq.TargetPath,
455+
}
456+
if err = <-reqFinished; err != nil {
457+
t.Fatalf("Unexpected error: %v", err)
458+
}
459+
460+
// To clean up, allow the vol1PublishTargetAReq to complete
461+
mountToRun <- mountmanager.MountSourceAndTarget{
462+
Source: vol1PublishTargetAReq.StagingTargetPath,
463+
Target: vol1PublishTargetAReq.TargetPath,
464+
}
465+
if err = <-reqFinished; err != nil {
466+
t.Fatalf("Unexpected error: %v", err)
467+
}
468+
}

pkg/mount-manager/fake-safe-mounter.go

+65-23
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,74 @@ package mountmanager
1616

1717
import "k8s.io/kubernetes/pkg/util/mount"
1818

19-
func NewFakeSafeMounter() *mount.SafeFormatAndMount {
20-
execCallback := func(cmd string, args ...string) ([]byte, error) {
21-
return nil, nil
22-
// TODO(#48): Fill out exec callback for errors
23-
/*
24-
if len(test.execScripts) <= execCallCount {
25-
t.Errorf("Unexpected command: %s %v", cmd, args)
26-
return nil, nil
27-
}
28-
script := test.execScripts[execCallCount]
29-
execCallCount++
30-
if script.command != cmd {
31-
t.Errorf("Unexpected command %s. Expecting %s", cmd, script.command)
32-
}
33-
for j := range args {
34-
if args[j] != script.args[j] {
35-
t.Errorf("Unexpected args %v. Expecting %v", args, script.args)
36-
}
19+
var (
20+
fakeMounter = &mount.FakeMounter{MountPoints: []mount.MountPoint{}, Log: []mount.FakeAction{}}
21+
fakeExec = mount.NewFakeExec(execCallback)
22+
)
23+
24+
func execCallback(cmd string, args ...string) ([]byte, error) {
25+
return nil, nil
26+
// TODO(#48): Fill out exec callback for errors
27+
/*
28+
if len(test.execScripts) <= execCallCount {
29+
t.Errorf("Unexpected command: %s %v", cmd, args)
30+
return nil, nil
31+
}
32+
script := test.execScripts[execCallCount]
33+
execCallCount++
34+
if script.command != cmd {
35+
t.Errorf("Unexpected command %s. Expecting %s", cmd, script.command)
36+
}
37+
for j := range args {
38+
if args[j] != script.args[j] {
39+
t.Errorf("Unexpected args %v. Expecting %v", args, script.args)
3740
}
38-
return []byte(script.output), script.err
39-
*/
40-
}
41-
fakeMounter := &mount.FakeMounter{MountPoints: []mount.MountPoint{}, Log: []mount.FakeAction{}}
42-
fakeExec := mount.NewFakeExec(execCallback)
41+
}
42+
return []byte(script.output), script.err
43+
*/
44+
}
45+
46+
func NewFakeSafeMounter() *mount.SafeFormatAndMount {
4347
return &mount.SafeFormatAndMount{
4448
Interface: fakeMounter,
4549
Exec: fakeExec,
4650
}
4751
}
52+
53+
type FakeBlockingMounter struct {
54+
*mount.FakeMounter
55+
mountToRun chan MountSourceAndTarget
56+
readyToMount chan struct{}
57+
}
58+
59+
type MountSourceAndTarget struct {
60+
Source string
61+
Target string
62+
}
63+
64+
// FakeBlockingMounter's method adds two channels to the Mount process in order to provide functionality to finely
65+
// control the order of execution of Mount calls. readToMount signals that a Mount operation has been called.
66+
// Then it cycles through the mountToRun channel, waiting for permission to actually make the mount operation.
67+
func (mounter *FakeBlockingMounter) Mount(source string, target string, fstype string, options []string) error {
68+
mounter.readyToMount <- struct{}{}
69+
for mountToRun := range mounter.mountToRun {
70+
if mountToRun.Source == source && mountToRun.Target == target {
71+
break
72+
} else {
73+
mounter.mountToRun <- mountToRun
74+
}
75+
}
76+
return mounter.FakeMounter.Mount(source, target, fstype, options)
77+
}
78+
79+
func NewFakeSafeBlockingMounter(mountToRun chan MountSourceAndTarget, readyToMount chan struct{}) *mount.SafeFormatAndMount {
80+
fakeBlockingMounter := &FakeBlockingMounter{
81+
FakeMounter: fakeMounter,
82+
mountToRun: mountToRun,
83+
readyToMount: readyToMount,
84+
}
85+
return &mount.SafeFormatAndMount{
86+
Interface: fakeBlockingMounter,
87+
Exec: fakeExec,
88+
}
89+
}

0 commit comments

Comments
 (0)