Skip to content

Commit e9dbd30

Browse files
author
Hantao (Will) Wang
committed
use volume lock instead of volume key everywhere
1 parent 2108d29 commit e9dbd30

File tree

6 files changed

+69
-36
lines changed

6 files changed

+69
-36
lines changed

pkg/common/utils.go

+11
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ func VolumeIDToKey(id string) (*meta.Key, error) {
7373
}
7474
}
7575

76+
func KeyToVolumeID(volKey *meta.Key, project string) (string, error) {
77+
switch volKey.Type() {
78+
case meta.Zonal:
79+
return fmt.Sprintf(volIDZonalFmt, project, volKey.Zone, volKey.Zone), nil
80+
case meta.Regional:
81+
return fmt.Sprintf(volIDZonalFmt, project, volKey.Region, volKey.Zone), nil
82+
default:
83+
return "", fmt.Errorf("volume key %v neither zonal nor regional", volKey.Name)
84+
}
85+
}
86+
7687
func GenerateUnderspecifiedVolumeID(diskName string, isZonal bool) string {
7788
if isZonal {
7889
return fmt.Sprintf(volIDZonalFmt, UnspecifiedValue, UnspecifiedValue, diskName)

pkg/common/volume_lock.go

+16-8
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,39 @@ limitations under the License.
1616

1717
package common
1818

19-
import "sync"
19+
import (
20+
"sync"
21+
22+
"k8s.io/apimachinery/pkg/util/sets"
23+
)
24+
25+
const (
26+
VolumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
27+
)
2028

2129
type VolumeLocks struct {
22-
locks map[string]interface{}
30+
locks sets.String
2331
mux sync.Mutex
2432
}
2533

2634
func NewVolumeLocks() *VolumeLocks {
2735
return &VolumeLocks{
28-
locks: make(map[string]interface{}),
36+
locks: sets.NewString(),
2937
}
3038
}
3139

32-
func (vl *VolumeLocks) TryAcquire(lock string) bool {
40+
func (vl *VolumeLocks) TryAcquire(volumeID string) bool {
3341
vl.mux.Lock()
3442
defer vl.mux.Unlock()
35-
if _, exists := vl.locks[lock]; exists {
43+
if vl.locks.Has(volumeID) {
3644
return false
3745
}
38-
vl.locks[lock] = nil
46+
vl.locks.Insert(volumeID)
3947
return true
4048
}
4149

42-
func (vl *VolumeLocks) Release(lock string) {
50+
func (vl *VolumeLocks) Release(volumeID string) {
4351
vl.mux.Lock()
4452
defer vl.mux.Unlock()
45-
delete(vl.locks, lock)
53+
vl.locks.Delete(volumeID)
4654
}

pkg/gce-cloud-provider/compute/fake-gce.go

+4
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func FakeCreateCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa
6363
return fcp, nil
6464
}
6565

66+
func (cloud *FakeCloudProvider) GetProject() string {
67+
return cloud.project
68+
}
69+
6670
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {
6771
switch volumeKey.Type() {
6872
case meta.Zonal:

pkg/gce-cloud-provider/compute/gce-compute.go

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ type GCECompute interface {
6060
GetSnapshot(ctx context.Context, snapshotName string) (*compute.Snapshot, error)
6161
CreateSnapshot(ctx context.Context, volKey *meta.Key, snapshotName string) (*compute.Snapshot, error)
6262
DeleteSnapshot(ctx context.Context, snapshotName string) error
63+
// Metadata
64+
GetProject() string
65+
}
66+
67+
func (cloud *CloudProvider) GetProject() string {
68+
return cloud.project
6369
}
6470

6571
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified

pkg/gce-pd-csi-driver/controller.go

+27-23
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
112112
// Resource names (e.g. "keyRings", "cryptoKeys", etc.) are case sensitive, so do not change case
113113
diskEncryptionKmsKey = v
114114
default:
115-
return nil, status.Errorf(codes.InvalidArgument,"CreateVolume invalid option %q", k)
115+
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume invalid option %q", k)
116116
}
117117
}
118118
// Determine the zone or zones+region of the disk
@@ -122,10 +122,10 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
122122
case replicationTypeNone:
123123
zones, err = pickZones(gceCS, req.GetAccessibilityRequirements(), 1)
124124
if err != nil {
125-
return nil, status.Errorf(codes.InvalidArgument,"CreateVolume failed to pick zones for disk: %v", err)
125+
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume failed to pick zones for disk: %v", err)
126126
}
127127
if len(zones) != 1 {
128-
return nil, status.Errorf(codes.Internal,"Failed to pick exactly 1 zone for zonal disk, got %v instead", len(zones))
128+
return nil, status.Errorf(codes.Internal, "Failed to pick exactly 1 zone for zonal disk, got %v instead", len(zones))
129129
}
130130
volKey = meta.ZonalKey(name, zones[0])
131131

@@ -143,10 +143,14 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
143143
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume replication type '%s' is not supported", replicationType)
144144
}
145145

146-
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
147-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume Key %s already exists", volKey.String())
146+
volumeID, err := common.KeyToVolumeID(volKey, gceCS.CloudProvider.GetProject())
147+
if err != nil {
148+
return nil, status.Errorf(codes.Internal, "Failed to convert volume key to volume ID: %v", err)
149+
}
150+
if acquired := gceCS.volumes.TryAcquire(volumeID); !acquired {
151+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
148152
}
149-
defer gceCS.volumes.Release(volKey.String())
153+
defer gceCS.volumes.Release(volumeID)
150154

151155
// Validate if disk already exists
152156
existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, volKey)
@@ -231,10 +235,10 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
231235
return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volumeID, err)
232236
}
233237

234-
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
235-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume Key %s already exists", volKey.String())
238+
if acquired := gceCS.volumes.TryAcquire(volumeID); !acquired {
239+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
236240
}
237-
defer gceCS.volumes.Release(volKey.String())
241+
defer gceCS.volumes.Release(volumeID)
238242

239243
err = gceCS.CloudProvider.DeleteDisk(ctx, volKey)
240244
if err != nil {
@@ -272,11 +276,11 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
272276
return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volumeID, err)
273277
}
274278

275-
volKeyStr := volKey.String()
276-
if acquired := gceCS.volumes.TryAcquire(nodeID + "/" + volKeyStr); !acquired {
277-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume Key %s or Node ID %s already exists", volKeyStr, nodeID)
279+
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
280+
if acquired := gceCS.volumes.TryAcquire(lockingVolumeID); !acquired {
281+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
278282
}
279-
defer gceCS.volumes.Release(nodeID + "/" + volKeyStr)
283+
defer gceCS.volumes.Release(lockingVolumeID)
280284

281285
// TODO(#253): Check volume capability matches for ALREADY_EXISTS
282286
if err = validateVolumeCapability(volumeCapability); err != nil {
@@ -363,11 +367,11 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
363367
return nil, err
364368
}
365369

366-
volKeyStr := volKey.String()
367-
if acquired := gceCS.volumes.TryAcquire(nodeID + "/" + volKeyStr); !acquired {
368-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume Key %s or Node ID %s already exists", volKeyStr, nodeID)
370+
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
371+
if acquired := gceCS.volumes.TryAcquire(lockingVolumeID); !acquired {
372+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
369373
}
370-
defer gceCS.volumes.Release(nodeID + "/" + volKeyStr)
374+
defer gceCS.volumes.Release(lockingVolumeID)
371375

372376
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
373377
if err != nil {
@@ -416,10 +420,10 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
416420
return nil, status.Errorf(codes.NotFound, "Volume ID is of improper format, got %v", volumeID)
417421
}
418422

419-
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
420-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume Key %s already exists", volKey.String())
423+
if acquired := gceCS.volumes.TryAcquire(volumeID); !acquired {
424+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
421425
}
422-
defer gceCS.volumes.Release(volKey.String())
426+
defer gceCS.volumes.Release(volumeID)
423427

424428
_, err = gceCS.CloudProvider.GetDisk(ctx, volKey)
425429
if err != nil {
@@ -528,10 +532,10 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C
528532
return nil, status.Errorf(codes.NotFound, "Could not find volume with ID %v: %v", volumeID, err)
529533
}
530534

531-
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
532-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume Key %s already exists", volKey.String())
535+
if acquired := gceCS.volumes.TryAcquire(volumeID); !acquired {
536+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
533537
}
534-
defer gceCS.volumes.Release(volKey.String())
538+
defer gceCS.volumes.Release(volumeID)
535539

536540
// Check if snapshot already exists
537541
var snapshot *compute.Snapshot

pkg/gce-pd-csi-driver/node.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePub
7474
}
7575

7676
if acquired := ns.volumes.TryAcquire(volumeID); !acquired {
77-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume ID %s already exists", volumeID)
77+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
7878
}
7979
defer ns.volumes.Release(volumeID)
8080

@@ -197,7 +197,7 @@ func (ns *GCENodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
197197
}
198198

199199
if acquired := ns.volumes.TryAcquire(volumeID); !acquired {
200-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume ID %s already exists", volumeID)
200+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
201201
}
202202
defer ns.volumes.Release(volumeID)
203203

@@ -227,7 +227,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
227227
}
228228

229229
if acquired := ns.volumes.TryAcquire(volumeID); !acquired {
230-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume ID %s already exists", volumeID)
230+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
231231
}
232232
defer ns.volumes.Release(volumeID)
233233

@@ -301,7 +301,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
301301
if err != nil {
302302
return nil, status.Errorf(codes.Internal,
303303
"Failed to format and mount device from (%q) to (%q) with fstype (%q) and options (%q): %v",
304-
devicePath, stagingTargetPath, fstype, options, err)
304+
devicePath, stagingTargetPath, fstype, options, err)
305305
}
306306

307307
return &csi.NodeStageVolumeResponse{}, nil
@@ -321,7 +321,7 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
321321
}
322322

323323
if acquired := ns.volumes.TryAcquire(volumeID); !acquired {
324-
return nil, status.Errorf(codes.Aborted, "An operation with the given Volume ID %s already exists", volumeID)
324+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
325325
}
326326
defer ns.volumes.Release(volumeID)
327327

0 commit comments

Comments
 (0)