Skip to content

Commit 29311bd

Browse files
author
Hantao (Will) Wang
committed
use the volume locks for node operations too
1 parent 8f91f4e commit 29311bd

File tree

2 files changed

+13
-13
lines changed

2 files changed

+13
-13
lines changed

pkg/gce-pd-csi-driver/gce-pd-driver.go

+1
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
137137
Mounter: mounter,
138138
DeviceUtils: deviceUtils,
139139
MetadataService: meta,
140+
volumes: common.NewVolumeLocks(),
140141
}
141142
}
142143

pkg/gce-pd-csi-driver/node.go

+12-13
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"fmt"
1919
"os"
2020
"strings"
21-
"sync"
2221

2322
"context"
2423
csi "github.com/container-storage-interface/spec/lib/go/csi"
@@ -39,7 +38,7 @@ type GCENodeServer struct {
3938

4039
// A map storing all volumes with ongoing operations so that additional operations
4140
// for that same volume (as defined by VolumeID) return an Aborted error
42-
volumes sync.Map
41+
volumes *common.VolumeLocks
4342
}
4443

4544
var _ csi.NodeServer = &GCENodeServer{}
@@ -74,10 +73,10 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
7473
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
7574
}
7675

77-
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
76+
if acquired := ns.volumes.TryAcquire(volumeID); !acquired {
7877
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
7978
}
80-
defer ns.volumes.Delete(volumeID)
79+
defer ns.volumes.Release(volumeID)
8180

8281
if err := validateVolumeCapability(volumeCapability); err != nil {
8382
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err))
@@ -189,18 +188,18 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
189188

190189
// Validate Arguments
191190
targetPath := req.GetTargetPath()
192-
volID := req.GetVolumeId()
193-
if len(volID) == 0 {
191+
volumeID := req.GetVolumeId()
192+
if len(volumeID) == 0 {
194193
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Volume ID must be provided")
195194
}
196195
if len(targetPath) == 0 {
197196
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
198197
}
199198

200-
if _, alreadyExists := ns.volumes.LoadOrStore(volID, true); alreadyExists {
201-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volID))
199+
if acquired := ns.volumes.TryAcquire(volumeID); !acquired {
200+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
202201
}
203-
defer ns.volumes.Delete(volID)
202+
defer ns.volumes.Release(volumeID)
204203

205204
err := mount.CleanupMountPoint(targetPath, ns.Mounter.Interface, false /* bind mount */)
206205
if err != nil {
@@ -227,10 +226,10 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
227226
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume Volume Capability must be provided")
228227
}
229228

230-
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
229+
if acquired := ns.volumes.TryAcquire(volumeID); !acquired {
231230
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
232231
}
233-
defer ns.volumes.Delete(volumeID)
232+
defer ns.volumes.Release(volumeID)
234233

235234
if err := validateVolumeCapability(volumeCapability); err != nil {
236235
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", err))
@@ -321,10 +320,10 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
321320
return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume Staging Target Path must be provided")
322321
}
323322

324-
if _, alreadyExists := ns.volumes.LoadOrStore(volumeID, true); alreadyExists {
323+
if acquired := ns.volumes.TryAcquire(volumeID); !acquired {
325324
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume ID %s already exists", volumeID))
326325
}
327-
defer ns.volumes.Delete(volumeID)
326+
defer ns.volumes.Release(volumeID)
328327

329328
err := mount.CleanupMountPoint(stagingTargetPath, ns.Mounter.Interface, false /* bind mount */)
330329
if err != nil {

0 commit comments

Comments
 (0)