@@ -37,8 +37,10 @@ type GCENodeServer struct {
37
37
Mounter * mount.SafeFormatAndMount
38
38
DeviceUtils mountmanager.DeviceUtils
39
39
MetadataService metadataservice.MetadataService
40
- // TODO: Only lock mutually exclusive calls and make locking more fine grained
41
- mux sync.Mutex
40
+
41
+ // A map storing all volumes with ongoing operations so that additional operations
42
+ // for that same volume (as defined by VolumeID) return an Aborted error
43
+ volumes sync.Map
42
44
}
43
45
44
46
var _ csi.NodeServer = & GCENodeServer {}
@@ -52,8 +54,6 @@ const (
52
54
)
53
55
54
56
func (ns * GCENodeServer ) NodePublishVolume (ctx context.Context , req * csi.NodePublishVolumeRequest ) (* csi.NodePublishVolumeResponse , error ) {
55
- ns .mux .Lock ()
56
- defer ns .mux .Unlock ()
57
57
glog .V (4 ).Infof ("NodePublishVolume called with req: %#v" , req )
58
58
59
59
// Validate Arguments
@@ -75,6 +75,11 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
75
75
return nil , status .Error (codes .InvalidArgument , "NodePublishVolume Volume Capability must be provided" )
76
76
}
77
77
78
+ if _ , alreadyExists := ns .volumes .LoadOrStore (volumeID , true ); alreadyExists {
79
+ return nil , status .Error (codes .Aborted , fmt .Sprintf ("An operation with the given Volume ID %s already exists" , volumeID ))
80
+ }
81
+ defer ns .volumes .Delete (volumeID )
82
+
78
83
if err := validateVolumeCapability (volumeCapability ); err != nil {
79
84
return nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("VolumeCapability is invalid: %v" , err ))
80
85
}
@@ -181,8 +186,6 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
181
186
}
182
187
183
188
func (ns * GCENodeServer ) NodeUnpublishVolume (ctx context.Context , req * csi.NodeUnpublishVolumeRequest ) (* csi.NodeUnpublishVolumeResponse , error ) {
184
- ns .mux .Lock ()
185
- defer ns .mux .Unlock ()
186
189
glog .V (4 ).Infof ("NodeUnpublishVolume called with args: %v" , req )
187
190
// Validate Arguments
188
191
targetPath := req .GetTargetPath ()
@@ -194,6 +197,11 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
194
197
return nil , status .Error (codes .InvalidArgument , "NodeUnpublishVolume Target Path must be provided" )
195
198
}
196
199
200
+ if _ , alreadyExists := ns .volumes .LoadOrStore (volID , true ); alreadyExists {
201
+ return nil , status .Error (codes .Aborted , fmt .Sprintf ("An operation with the given Volume ID %s already exists" , volID ))
202
+ }
203
+ defer ns .volumes .Delete (volID )
204
+
197
205
err := volumeutils .UnmountMountPoint (targetPath , ns .Mounter .Interface , false /* bind mount */ )
198
206
if err != nil {
199
207
return nil , status .Error (codes .Internal , fmt .Sprintf ("Unmount failed: %v\n Unmounting arguments: %s\n " , err , targetPath ))
@@ -203,8 +211,6 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
203
211
}
204
212
205
213
func (ns * GCENodeServer ) NodeStageVolume (ctx context.Context , req * csi.NodeStageVolumeRequest ) (* csi.NodeStageVolumeResponse , error ) {
206
- ns .mux .Lock ()
207
- defer ns .mux .Unlock ()
208
214
glog .V (4 ).Infof ("NodeStageVolume called with req: %#v" , req )
209
215
210
216
// Validate Arguments
@@ -221,6 +227,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
221
227
return nil , status .Error (codes .InvalidArgument , "NodeStageVolume Volume Capability must be provided" )
222
228
}
223
229
230
+ if _ , alreadyExists := ns .volumes .LoadOrStore (volumeID , true ); alreadyExists {
231
+ return nil , status .Error (codes .Aborted , fmt .Sprintf ("An operation with the given Volume ID %s already exists" , volumeID ))
232
+ }
233
+ defer ns .volumes .Delete (volumeID )
234
+
224
235
if err := validateVolumeCapability (volumeCapability ); err != nil {
225
236
return nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("VolumeCapability is invalid: %v" , err ))
226
237
}
@@ -298,8 +309,6 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
298
309
}
299
310
300
311
func (ns * GCENodeServer ) NodeUnstageVolume (ctx context.Context , req * csi.NodeUnstageVolumeRequest ) (* csi.NodeUnstageVolumeResponse , error ) {
301
- ns .mux .Lock ()
302
- defer ns .mux .Unlock ()
303
312
glog .V (4 ).Infof ("NodeUnstageVolume called with req: %#v" , req )
304
313
// Validate arguments
305
314
volumeID := req .GetVolumeId ()
@@ -311,6 +320,11 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
311
320
return nil , status .Error (codes .InvalidArgument , "NodeUnstageVolume Staging Target Path must be provided" )
312
321
}
313
322
323
+ if _ , alreadyExists := ns .volumes .LoadOrStore (volumeID , true ); alreadyExists {
324
+ return nil , status .Error (codes .Aborted , fmt .Sprintf ("An operation with the given Volume ID %s already exists" , volumeID ))
325
+ }
326
+ defer ns .volumes .Delete (volumeID )
327
+
314
328
err := volumeutils .UnmountMountPoint (stagingTargetPath , ns .Mounter .Interface , false /* bind mount */ )
315
329
if err != nil {
316
330
return nil , status .Error (codes .Internal , fmt .Sprintf ("NodeUnstageVolume failed to unmount at path %s: %v" , stagingTargetPath , err ))
0 commit comments