Skip to content

Commit f478fad

Browse files
author
Hantao (Will) Wang
committed
use volume lock instead of volume key everywhere
1 parent 658896f commit f478fad

File tree

6 files changed

+62
-29
lines changed

6 files changed

+62
-29
lines changed

pkg/common/utils.go

+11
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ func VolumeIDToKey(id string) (*meta.Key, error) {
7373
}
7474
}
7575

76+
func KeyToVolumeID(volKey *meta.Key, project string) (string, error) {
77+
switch volKey.Type() {
78+
case meta.Zonal:
79+
return fmt.Sprintf(volIDZonalFmt, project, volKey.Zone, volKey.Zone), nil
80+
case meta.Regional:
81+
return fmt.Sprintf(volIDZonalFmt, project, volKey.Region, volKey.Zone), nil
82+
default:
83+
return "", fmt.Errorf("volume key %v neither zonal nor regional", volKey.Name)
84+
}
85+
}
86+
7687
func GenerateUnderspecifiedVolumeID(diskName string, isZonal bool) string {
7788
if isZonal {
7889
return fmt.Sprintf(volIDZonalFmt, UnspecifiedValue, UnspecifiedValue, diskName)

pkg/common/volume_lock.go

+16-8
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,39 @@ limitations under the License.
1616

1717
package common
1818

19-
import "sync"
19+
import (
20+
"sync"
21+
22+
"k8s.io/apimachinery/pkg/util/sets"
23+
)
24+
25+
const (
26+
VolumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
27+
)
2028

2129
type VolumeLocks struct {
22-
locks map[string]interface{}
30+
locks sets.String
2331
mux sync.Mutex
2432
}
2533

2634
func NewVolumeLocks() *VolumeLocks {
2735
return &VolumeLocks{
28-
locks: make(map[string]interface{}),
36+
locks: sets.NewString(),
2937
}
3038
}
3139

32-
func (vl *VolumeLocks) TryAcquire(lock string) bool {
40+
func (vl *VolumeLocks) TryAcquire(volumeID string) bool {
3341
vl.mux.Lock()
3442
defer vl.mux.Unlock()
35-
if _, exists := vl.locks[lock]; exists {
43+
if vl.locks.Has(volumeID) {
3644
return false
3745
}
38-
vl.locks[lock] = nil
46+
vl.locks.Insert(volumeID)
3947
return true
4048
}
4149

42-
func (vl *VolumeLocks) Release(lock string) {
50+
func (vl *VolumeLocks) Release(volumeID string) {
4351
vl.mux.Lock()
4452
defer vl.mux.Unlock()
45-
delete(vl.locks, lock)
53+
vl.locks.Delete(volumeID)
4654
}

pkg/gce-cloud-provider/compute/fake-gce.go

+4
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func FakeCreateCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa
6363
return fcp, nil
6464
}
6565

66+
func (cloud *FakeCloudProvider) GetProject() string {
67+
return cloud.project
68+
}
69+
6670
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {
6771
switch volumeKey.Type() {
6872
case meta.Zonal:

pkg/gce-cloud-provider/compute/gce-compute.go

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ type GCECompute interface {
6060
GetSnapshot(ctx context.Context, snapshotName string) (*compute.Snapshot, error)
6161
CreateSnapshot(ctx context.Context, volKey *meta.Key, snapshotName string) (*compute.Snapshot, error)
6262
DeleteSnapshot(ctx context.Context, snapshotName string) error
63+
// Metadata
64+
GetProject() string
65+
}
66+
67+
func (cloud *CloudProvider) GetProject() string {
68+
return cloud.project
6369
}
6470

6571
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified

pkg/gce-pd-csi-driver/controller.go

+24-20
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,14 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
143143
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume replication type '%s' is not supported", replicationType))
144144
}
145145

146-
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
147-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
146+
volumeID, err := common.KeyToVolumeID(volKey, gceCS.CloudProvider.GetProject())
147+
if err != nil {
148+
return nil, status.Errorf(codes.Internal, "Failed to convert volume key to volume ID: %v", err)
149+
}
150+
if acquired := gceCS.volumes.TryAcquire(volumeID); !acquired {
151+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
148152
}
149-
defer gceCS.volumes.Release(volKey.String())
153+
defer gceCS.volumes.Release(volumeID)
150154

151155
// Validate if disk already exists
152156
existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, volKey)
@@ -231,10 +235,10 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
231235
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
232236
}
233237

234-
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
235-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
238+
if acquired := gceCS.volumes.TryAcquire(volumeID); !acquired {
239+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
236240
}
237-
defer gceCS.volumes.Release(volKey.String())
241+
defer gceCS.volumes.Release(volumeID)
238242

239243
err = gceCS.CloudProvider.DeleteDisk(ctx, volKey)
240244
if err != nil {
@@ -272,11 +276,11 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
272276
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
273277
}
274278

275-
volKeyStr := volKey.String()
276-
if acquired := gceCS.volumes.TryAcquire(nodeID + "/" + volKeyStr); !acquired {
277-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s or Node ID %s already exists", volKeyStr, nodeID))
279+
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
280+
if acquired := gceCS.volumes.TryAcquire(lockingVolumeID); !acquired {
281+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
278282
}
279-
defer gceCS.volumes.Release(nodeID + "/" + volKeyStr)
283+
defer gceCS.volumes.Release(lockingVolumeID)
280284

281285
// TODO(#253): Check volume capability matches for ALREADY_EXISTS
282286
if err = validateVolumeCapability(volumeCapability); err != nil {
@@ -363,11 +367,11 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
363367
return nil, err
364368
}
365369

366-
volKeyStr := volKey.String()
367-
if acquired := gceCS.volumes.TryAcquire(nodeID + "/" + volKeyStr); !acquired {
368-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s or Node ID %s already exists", volKeyStr, nodeID))
370+
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
371+
if acquired := gceCS.volumes.TryAcquire(lockingVolumeID); !acquired {
372+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
369373
}
370-
defer gceCS.volumes.Release(nodeID + "/" + volKeyStr)
374+
defer gceCS.volumes.Release(lockingVolumeID)
371375

372376
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
373377
if err != nil {
@@ -416,10 +420,10 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
416420
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume ID is of improper format, got %v", volumeID))
417421
}
418422

419-
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
420-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
423+
if acquired := gceCS.volumes.TryAcquire(volumeID); !acquired {
424+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
421425
}
422-
defer gceCS.volumes.Release(volKey.String())
426+
defer gceCS.volumes.Release(volumeID)
423427

424428
_, err = gceCS.CloudProvider.GetDisk(ctx, volKey)
425429
if err != nil {
@@ -528,10 +532,10 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C
528532
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
529533
}
530534

531-
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
532-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
535+
if acquired := gceCS.volumes.TryAcquire(volumeID); !acquired {
536+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
533537
}
534-
defer gceCS.volumes.Release(volKey.String())
538+
defer gceCS.volumes.Release(volumeID)
535539

536540
// Check if snapshot already exists
537541
var snapshot *compute.Snapshot

pkg/gce-pd-csi-driver/node.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
301301
if err != nil {
302302
return nil, status.Error(codes.Internal,
303303
fmt.Sprintf("Failed to format and mount device from (%q) to (%q) with fstype (%q) and options (%q): %v",
304-
devicePath, stagingTargetPath, fstype, options, err))
304+
devicePath, stagingTargetPath, fstype, options, err))e
305305
}
306306

307307
return &csi.NodeStageVolumeResponse{}, nil

0 commit comments

Comments
 (0)