Skip to content

Commit 6b58903

Browse files
author
Hantao (Will) Wang
committed
paralleize node operations by returning an error if the volume is in use
1 parent 6e8691f commit 6b58903

File tree

5 files changed

+184
-40
lines changed

5 files changed

+184
-40
lines changed

deploy/kubernetes/overlays/dev/kustomization.yaml

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ apiVersion: kustomize.config.k8s.io/v1beta1
22
kind: Kustomization
33
bases:
44
- ../alpha
5-
patches:
5+
patchesStrategicMerge:
66
- controller_always_pull.yaml
77
- node_always_pull.yaml
8-
images:
98
# Replace this with your private image names and tags
9+
images:
1010
- name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
11-
newName: gcr.io/REPLACEME/gcp-compute-persistent-disk-csi-driver
12-
newTag: "latest"
11+
newName: gcr.io/hww-gke-dev/gcp-pd-csi-driver-test
12+
newTag: 13d60568-92cf-11e9-8b87-1ca0b875cd88

pkg/gce-pd-csi-driver/node.go

+24-10
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ type GCENodeServer struct {
3737
Mounter *mount.SafeFormatAndMount
3838
DeviceUtils mountmanager.DeviceUtils
3939
MetadataService metadataservice.MetadataService
40-
// TODO: Only lock mutually exclusive calls and make locking more fine grained
41-
mux sync.Mutex
40+
41+
// A map storing all volumes with ongoing operations so that additional operations
42+
// for that same volume (as defined by VolumeID) return an Aborted error
43+
volumes sync.Map
4244
}
4345

4446
var _ csi.NodeServer = &GCENodeServer{}
@@ -52,8 +54,6 @@ const (
5254
)
5355

5456
func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
55-
ns.mux.Lock()
56-
defer ns.mux.Unlock()
5757
glog.V(4).Infof("NodePublishVolume called with req: %#v", req)
5858

5959
// Validate Arguments
@@ -75,6 +75,11 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
7575
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
7676
}
7777

78+
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
79+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
80+
}
81+
defer ns.volumes.Delete(volumeID)
82+
7883
if err := validateVolumeCapability(volumeCapability); err != nil {
7984
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err))
8085
}
@@ -181,8 +186,6 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
181186
}
182187

183188
func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
184-
ns.mux.Lock()
185-
defer ns.mux.Unlock()
186189
glog.V(4).Infof("NodeUnpublishVolume called with args: %v", req)
187190
// Validate Arguments
188191
targetPath := req.GetTargetPath()
@@ -194,6 +197,11 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
194197
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
195198
}
196199

200+
if _, alreadyExists := ns.volumes.LoadOrStore(volID, true); alreadyExists {
201+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volID))
202+
}
203+
defer ns.volumes.Delete(volID)
204+
197205
err := volumeutils.UnmountMountPoint(targetPath, ns.Mounter.Interface, false /* bind mount */)
198206
if err != nil {
199207
return nil, status.Error(codes.Internal, fmt.Sprintf("Unmount failed: %v\nUnmounting arguments: %s\n", err, targetPath))
@@ -203,8 +211,6 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
203211
}
204212

205213
func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
206-
ns.mux.Lock()
207-
defer ns.mux.Unlock()
208214
glog.V(4).Infof("NodeStageVolume called with req: %#v", req)
209215

210216
// Validate Arguments
@@ -221,6 +227,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
221227
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
222228
}
223229

230+
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
231+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
232+
}
233+
defer ns.volumes.Delete(volumeID)
234+
224235
if err := validateVolumeCapability(volumeCapability); err != nil {
225236
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err))
226237
}
@@ -298,8 +309,6 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
298309
}
299310

300311
func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
301-
ns.mux.Lock()
302-
defer ns.mux.Unlock()
303312
glog.V(4).Infof("NodeUnstageVolume called with req: %#v", req)
304313
// Validate arguments
305314
volumeID := req.GetVolumeId()
@@ -311,6 +320,11 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
311320
return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume Staging Target Path must be provided")
312321
}
313322

323+
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
324+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
325+
}
326+
defer ns.volumes.Delete(volumeID)
327+
314328
err := volumeutils.UnmountMountPoint(stagingTargetPath, ns.Mounter.Interface, false /* bind mount */)
315329
if err != nil {
316330
return nil, status.Error(codes.Internal, fmt.Sprintf("NodeUnstageVolume failed to unmount at path %s: %v", stagingTargetPath, err))

pkg/gce-pd-csi-driver/node_test.go

+88
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ func getTestGCEDriver(t *testing.T) *GCEDriver {
3838
return gceDriver
3939
}
4040

41+
func getTestBlockingGCEDriver(t *testing.T, mountToRun chan mountmanager.MountSourceAndTarget, readyToMount chan struct{}) *GCEDriver {
42+
gceDriver := GetGCEDriver()
43+
err := gceDriver.SetupGCEDriver(nil, mountmanager.NewFakeSafeBlockingMounter(mountToRun, readyToMount), mountmanager.NewFakeDeviceUtils(), metadataservice.NewFakeService(), driver, "test-vendor")
44+
if err != nil {
45+
t.Fatalf("Failed to setup GCE Driver: %v", err)
46+
}
47+
return gceDriver
48+
}
49+
4150
func TestNodeGetVolumeLimits(t *testing.T) {
4251

4352
gceDriver := getTestGCEDriver(t)
@@ -378,3 +387,82 @@ func TestNodeGetCapabilities(t *testing.T) {
378387
t.Fatalf("Unexpedted error: %v", err)
379388
}
380389
}
390+
391+
func TestConcurrentNodeOperations(t *testing.T) {
392+
mountToRun := make(chan mountmanager.MountSourceAndTarget, 3)
393+
readyToMount := make(chan struct{}, 2)
394+
reqFinished := make(chan error, 2)
395+
396+
gceDriver := getTestBlockingGCEDriver(t, mountToRun, readyToMount)
397+
ns := gceDriver.ns
398+
vol1PublishTargetAReq := &csi.NodePublishVolumeRequest{
399+
VolumeId: defaultVolumeID + "1",
400+
TargetPath: defaultTargetPath + "a",
401+
StagingTargetPath: defaultStagingPath + "1",
402+
Readonly: false,
403+
VolumeCapability: stdVolCap,
404+
}
405+
vol1PublishTargetBReq := &csi.NodePublishVolumeRequest{
406+
VolumeId: defaultVolumeID + "1",
407+
TargetPath: defaultTargetPath + "b",
408+
StagingTargetPath: defaultStagingPath + "1",
409+
Readonly: false,
410+
VolumeCapability: stdVolCap,
411+
}
412+
vol2PublishTargetCReq := &csi.NodePublishVolumeRequest{
413+
VolumeId: defaultVolumeID + "2",
414+
TargetPath: defaultTargetPath + "c",
415+
StagingTargetPath: defaultStagingPath + "2",
416+
Readonly: false,
417+
VolumeCapability: stdVolCap,
418+
}
419+
420+
runRequestInBackground := func(req *csi.NodePublishVolumeRequest) {
421+
_, err := ns.NodePublishVolume(context.Background(), req)
422+
reqFinished <- err
423+
}
424+
425+
// Start first valid request vol1PublishTargetAReq and block until it reaches the Mount
426+
go runRequestInBackground(vol1PublishTargetAReq)
427+
<-readyToMount
428+
429+
// Check that vol1PublishTargetBReq is rejected, due to same volume ID
430+
// Also allow vol1PublishTargetBReq to complete, in case it is allowed to Mount
431+
mountToRun <- mountmanager.MountSourceAndTarget{
432+
Source: vol1PublishTargetBReq.StagingTargetPath,
433+
Target: vol1PublishTargetBReq.TargetPath,
434+
}
435+
_, err := ns.NodePublishVolume(context.Background(), vol1PublishTargetBReq)
436+
if err != nil {
437+
serverError, ok := status.FromError(err)
438+
if !ok {
439+
t.Fatalf("Could not get error status code from err: %v", err)
440+
}
441+
if serverError.Code() != codes.Aborted {
442+
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
443+
}
444+
} else {
445+
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
446+
}
447+
448+
// Start second valid request vol2PublishTargetCReq
449+
go runRequestInBackground(vol2PublishTargetCReq)
450+
451+
// Allow the vol2PublishTargetCReq to complete, which it can concurrently with vol1PublishTargetAReq
452+
mountToRun <- mountmanager.MountSourceAndTarget{
453+
Source: vol2PublishTargetCReq.StagingTargetPath,
454+
Target: vol2PublishTargetCReq.TargetPath,
455+
}
456+
if err = <-reqFinished; err != nil {
457+
t.Fatalf("Unexpected error: %v", err)
458+
}
459+
460+
// To clean up, allow the vol1PublishTargetAReq to complete
461+
mountToRun <- mountmanager.MountSourceAndTarget{
462+
Source: vol1PublishTargetAReq.StagingTargetPath,
463+
Target: vol1PublishTargetAReq.TargetPath,
464+
}
465+
if err = <-reqFinished; err != nil {
466+
t.Fatalf("Unexpected error: %v", err)
467+
}
468+
}

pkg/mount-manager/fake-safe-mounter.go

+65-23
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,74 @@ package mountmanager
1616

1717
import "k8s.io/kubernetes/pkg/util/mount"
1818

19-
func NewFakeSafeMounter() *mount.SafeFormatAndMount {
20-
execCallback := func(cmd string, args ...string) ([]byte, error) {
21-
return nil, nil
22-
// TODO(#48): Fill out exec callback for errors
23-
/*
24-
if len(test.execScripts) <= execCallCount {
25-
t.Errorf("Unexpected command: %s %v", cmd, args)
26-
return nil, nil
27-
}
28-
script := test.execScripts[execCallCount]
29-
execCallCount++
30-
if script.command != cmd {
31-
t.Errorf("Unexpected command %s. Expecting %s", cmd, script.command)
32-
}
33-
for j := range args {
34-
if args[j] != script.args[j] {
35-
t.Errorf("Unexpected args %v. Expecting %v", args, script.args)
36-
}
19+
var (
20+
fakeMounter = &mount.FakeMounter{MountPoints: []mount.MountPoint{}, Log: []mount.FakeAction{}}
21+
fakeExec = mount.NewFakeExec(execCallback)
22+
)
23+
24+
func execCallback(cmd string, args ...string) ([]byte, error) {
25+
return nil, nil
26+
// TODO(#48): Fill out exec callback for errors
27+
/*
28+
if len(test.execScripts) <= execCallCount {
29+
t.Errorf("Unexpected command: %s %v", cmd, args)
30+
return nil, nil
31+
}
32+
script := test.execScripts[execCallCount]
33+
execCallCount++
34+
if script.command != cmd {
35+
t.Errorf("Unexpected command %s. Expecting %s", cmd, script.command)
36+
}
37+
for j := range args {
38+
if args[j] != script.args[j] {
39+
t.Errorf("Unexpected args %v. Expecting %v", args, script.args)
3740
}
38-
return []byte(script.output), script.err
39-
*/
40-
}
41-
fakeMounter := &mount.FakeMounter{MountPoints: []mount.MountPoint{}, Log: []mount.FakeAction{}}
42-
fakeExec := mount.NewFakeExec(execCallback)
41+
}
42+
return []byte(script.output), script.err
43+
*/
44+
}
45+
46+
func NewFakeSafeMounter() *mount.SafeFormatAndMount {
4347
return &mount.SafeFormatAndMount{
4448
Interface: fakeMounter,
4549
Exec: fakeExec,
4650
}
4751
}
52+
53+
type FakeBlockingMounter struct {
54+
*mount.FakeMounter
55+
mountToRun chan MountSourceAndTarget
56+
readyToMount chan struct{}
57+
}
58+
59+
type MountSourceAndTarget struct {
60+
Source string
61+
Target string
62+
}
63+
64+
// FakeBlockingMounter's method adds two channels to the Mount process in order to provide functionality to finely
65+
// control the order of execution of Mount calls. readToMount signals that a Mount operation has been called.
66+
// Then it cycles through the mountToRun channel, waiting for permission to actually make the mount operation.
67+
func (mounter *FakeBlockingMounter) Mount(source string, target string, fstype string, options []string) error {
68+
mounter.readyToMount <- struct{}{}
69+
for mountToRun := range mounter.mountToRun {
70+
if mountToRun.Source == source && mountToRun.Target == target {
71+
break
72+
} else {
73+
mounter.mountToRun <- mountToRun
74+
}
75+
}
76+
return mounter.FakeMounter.Mount(source, target, fstype, options)
77+
}
78+
79+
func NewFakeSafeBlockingMounter(mountToRun chan MountSourceAndTarget, readyToMount chan struct{}) *mount.SafeFormatAndMount {
80+
fakeBlockingMounter := &FakeBlockingMounter{
81+
FakeMounter: fakeMounter,
82+
mountToRun: mountToRun,
83+
readyToMount: readyToMount,
84+
}
85+
return &mount.SafeFormatAndMount{
86+
Interface: fakeBlockingMounter,
87+
Exec: fakeExec,
88+
}
89+
}

test/k8s-integration/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ func main() {
8282
glog.Fatalf("deploy-overlay-name is a required flag")
8383
}
8484

85-
if len(*storageClassFile) == 0 && !*migrationTest {
86-
glog.Fatalf("One of storageclass-file and migration-test must be set")
87-
}
85+
//if len(*storageClassFile) == 0 && !*migrationTest {
86+
// glog.Fatalf("One of storageclass-file and migration-test must be set")
87+
//}
8888

8989
if len(*storageClassFile) != 0 && *migrationTest {
9090
glog.Fatalf("storage-class-file and migration-test cannot both be set")

0 commit comments

Comments
 (0)