Skip to content

Commit 41f103f

Browse files
authored
Merge pull request kubernetes-sigs#316 from hantaowang/seralize-attach-ops
Add volume level serialization for controller operations
2 parents 965ba6a + 53a286c commit 41f103f

File tree

11 files changed

+291
-91
lines changed

11 files changed

+291
-91
lines changed

pkg/common/utils.go

+11
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ func VolumeIDToKey(id string) (*meta.Key, error) {
7373
}
7474
}
7575

76+
func KeyToVolumeID(volKey *meta.Key, project string) (string, error) {
77+
switch volKey.Type() {
78+
case meta.Zonal:
79+
return fmt.Sprintf(volIDZonalFmt, project, volKey.Zone, volKey.Zone), nil
80+
case meta.Regional:
81+
return fmt.Sprintf(volIDZonalFmt, project, volKey.Region, volKey.Zone), nil
82+
default:
83+
return "", fmt.Errorf("volume key %v neither zonal nor regional", volKey.Name)
84+
}
85+
}
86+
7687
func GenerateUnderspecifiedVolumeID(diskName string, isZonal bool) string {
7788
if isZonal {
7889
return fmt.Sprintf(volIDZonalFmt, UnspecifiedValue, UnspecifiedValue, diskName)

pkg/common/volume_lock.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package common
18+
19+
import (
20+
"sync"
21+
22+
"k8s.io/apimachinery/pkg/util/sets"
23+
)
24+
25+
const (
26+
VolumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
27+
)
28+
29+
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
30+
// with an ongoing operation.
31+
type VolumeLocks struct {
32+
locks sets.String
33+
mux sync.Mutex
34+
}
35+
36+
func NewVolumeLocks() *VolumeLocks {
37+
return &VolumeLocks{
38+
locks: sets.NewString(),
39+
}
40+
}
41+
42+
// TryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
43+
// If another operation is already using volumeID, returns false.
44+
func (vl *VolumeLocks) TryAcquire(volumeID string) bool {
45+
vl.mux.Lock()
46+
defer vl.mux.Unlock()
47+
if vl.locks.Has(volumeID) {
48+
return false
49+
}
50+
vl.locks.Insert(volumeID)
51+
return true
52+
}
53+
54+
func (vl *VolumeLocks) Release(volumeID string) {
55+
vl.mux.Lock()
56+
defer vl.mux.Unlock()
57+
vl.locks.Delete(volumeID)
58+
}

pkg/gce-cloud-provider/compute/fake-gce.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type FakeCloudProvider struct {
4949

5050
var _ GCECompute = &FakeCloudProvider{}
5151

52-
func FakeCreateCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*FakeCloudProvider, error) {
52+
func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*FakeCloudProvider, error) {
5353
fcp := &FakeCloudProvider{
5454
project: project,
5555
zone: zone,
@@ -61,7 +61,6 @@ func FakeCreateCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa
6161
fcp.disks[d.GetName()] = d
6262
}
6363
return fcp, nil
64-
6564
}
6665

6766
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {
@@ -422,6 +421,22 @@ func (cloud *FakeCloudProvider) getGlobalSnapshotURI(snapshotName string) string
422421
snapshotName)
423422
}
424423

424+
type FakeBlockingCloudProvider struct {
425+
*FakeCloudProvider
426+
ReadyToExecute chan chan struct{}
427+
}
428+
429+
// FakeBlockingCloudProvider's method adds functionality to finely control the order of execution of CreateSnapshot calls.
430+
// Upon starting a CreateSnapshot, it passes a chan 'executeCreateSnapshot' into readyToExecute, then blocks on executeCreateSnapshot.
431+
// The test calling this function can block on readyToExecute to ensure that the operation has started and
432+
// allowed the CreateSnapshot to continue by passing a struct into executeCreateSnapshot.
433+
func (cloud *FakeBlockingCloudProvider) CreateSnapshot(ctx context.Context, volKey *meta.Key, snapshotName string) (*compute.Snapshot, error) {
434+
executeCreateSnapshot := make(chan struct{})
435+
cloud.ReadyToExecute <- executeCreateSnapshot
436+
<-executeCreateSnapshot
437+
return cloud.FakeCloudProvider.CreateSnapshot(ctx, volKey, snapshotName)
438+
}
439+
425440
func notFoundError() *googleapi.Error {
426441
return &googleapi.Error{
427442
Errors: []googleapi.ErrorItem{

pkg/gce-pd-csi-driver/controller.go

+45
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type GCEControllerServer struct {
4242
Driver *GCEDriver
4343
CloudProvider gce.GCECompute
4444
MetadataService metadataservice.MetadataService
45+
46+
// A map storing all volumes with ongoing operations so that additional operations
47+
// for that same volume (as defined by Volume Key) return an Aborted error
48+
volumeLocks *common.VolumeLocks
4549
}
4650

4751
var _ csi.ControllerServer = &GCEControllerServer{}
@@ -139,6 +143,15 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
139143
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume replication type '%s' is not supported", replicationType))
140144
}
141145

146+
volumeID, err := common.KeyToVolumeID(volKey, gceCS.MetadataService.GetProject())
147+
if err != nil {
148+
return nil, status.Errorf(codes.Internal, "Failed to convert volume key to volume ID: %v", err)
149+
}
150+
if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired {
151+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
152+
}
153+
defer gceCS.volumeLocks.Release(volumeID)
154+
142155
// Validate if disk already exists
143156
existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, volKey)
144157
if err != nil {
@@ -222,6 +235,11 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
222235
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
223236
}
224237

238+
if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired {
239+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
240+
}
241+
defer gceCS.volumeLocks.Release(volumeID)
242+
225243
err = gceCS.CloudProvider.DeleteDisk(ctx, volKey)
226244
if err != nil {
227245
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown Delete disk error: %v", err))
@@ -258,6 +276,14 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
258276
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
259277
}
260278

279+
// Acquires the lock for the volume on that node only, because we need to support the ability
280+
// to publish the same volume onto different nodes concurrently
281+
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
282+
if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired {
283+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
284+
}
285+
defer gceCS.volumeLocks.Release(lockingVolumeID)
286+
261287
// TODO(#253): Check volume capability matches for ALREADY_EXISTS
262288
if err = validateVolumeCapability(volumeCapability); err != nil {
263289
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapabilities is invalid: %v", err))
@@ -343,6 +369,14 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
343369
return nil, err
344370
}
345371

372+
// Acquires the lock for the volume on that node only, because we need to support the ability
373+
// to unpublish the same volume from different nodes concurrently
374+
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
375+
if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired {
376+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
377+
}
378+
defer gceCS.volumeLocks.Release(lockingVolumeID)
379+
346380
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
347381
if err != nil {
348382
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("could not split nodeID: %v", err))
@@ -389,6 +423,12 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
389423
if err != nil {
390424
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume ID is of improper format, got %v", volumeID))
391425
}
426+
427+
if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired {
428+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
429+
}
430+
defer gceCS.volumeLocks.Release(volumeID)
431+
392432
_, err = gceCS.CloudProvider.GetDisk(ctx, volKey)
393433
if err != nil {
394434
if gce.IsGCEError(err, "notFound") {
@@ -496,6 +536,11 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C
496536
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
497537
}
498538

539+
if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired {
540+
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
541+
}
542+
defer gceCS.volumeLocks.Release(volumeID)
543+
499544
// Check if snapshot already exists
500545
var snapshot *compute.Snapshot
501546
snapshot, err = gceCS.CloudProvider.GetSnapshot(ctx, req.Name)

pkg/gce-pd-csi-driver/controller_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -1413,3 +1413,70 @@ func TestPickRandAndConsecutive(t *testing.T) {
14131413

14141414
}
14151415
}
1416+
1417+
func TestVolumeOperationConcurrency(t *testing.T) {
1418+
readyToExecute := make(chan chan struct{}, 1)
1419+
gceDriver := initBlockingGCEDriver(t, nil, readyToExecute)
1420+
cs := gceDriver.cs
1421+
1422+
vol1CreateSnapshotAReq := &csi.CreateSnapshotRequest{
1423+
Name: name + "1A",
1424+
SourceVolumeId: testVolumeId + "1",
1425+
}
1426+
vol1CreateSnapshotBReq := &csi.CreateSnapshotRequest{
1427+
Name: name + "1B",
1428+
SourceVolumeId: testVolumeId + "1",
1429+
}
1430+
vol2CreateSnapshotReq := &csi.CreateSnapshotRequest{
1431+
Name: name + "2",
1432+
SourceVolumeId: testVolumeId + "2",
1433+
}
1434+
1435+
runRequest := func(req *csi.CreateSnapshotRequest) <-chan error {
1436+
response := make(chan error)
1437+
go func() {
1438+
_, err := cs.CreateSnapshot(context.Background(), req)
1439+
response <- err
1440+
}()
1441+
return response
1442+
}
1443+
1444+
// Start first valid request vol1CreateSnapshotA and block until it reaches the CreateSnapshot
1445+
vol1CreateSnapshotAResp := runRequest(vol1CreateSnapshotAReq)
1446+
execVol1CreateSnapshotA := <-readyToExecute
1447+
1448+
// Start vol1CreateSnapshotB and allow it to execute to completion. Then check for Aborted error.
1449+
// If a non Abort error is received or if the operation was started, then there is a problem
1450+
// with volume locking
1451+
vol1CreateSnapshotBResp := runRequest(vol1CreateSnapshotBReq)
1452+
select {
1453+
case err := <-vol1CreateSnapshotBResp:
1454+
if err != nil {
1455+
serverError, ok := status.FromError(err)
1456+
if !ok {
1457+
t.Fatalf("Could not get error status code from err: %v", err)
1458+
}
1459+
if serverError.Code() != codes.Aborted {
1460+
t.Errorf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
1461+
}
1462+
} else {
1463+
t.Errorf("Expected error: %v, got no error", codes.Aborted)
1464+
}
1465+
case <-readyToExecute:
1466+
t.Errorf("The operation for vol1CreateSnapshotB should have been aborted, but was started")
1467+
}
1468+
1469+
// Start vol2CreateSnapshot and allow it to execute to completion. Then check for success.
1470+
vol2CreateSnapshotResp := runRequest(vol2CreateSnapshotReq)
1471+
execVol2CreateSnapshot := <-readyToExecute
1472+
execVol2CreateSnapshot <- struct{}{}
1473+
if err := <-vol2CreateSnapshotResp; err != nil {
1474+
t.Errorf("Unexpected error: %v", err)
1475+
}
1476+
1477+
// To clean up, allow the vol1CreateSnapshotA to complete
1478+
execVol1CreateSnapshotA <- struct{}{}
1479+
if err := <-vol1CreateSnapshotAResp; err != nil {
1480+
t.Errorf("Unexpected error: %v", err)
1481+
}
1482+
}

pkg/gce-pd-csi-driver/gce-pd-driver.go

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"google.golang.org/grpc/status"
2323
"k8s.io/klog"
2424
"k8s.io/kubernetes/pkg/util/mount"
25+
common "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
2526
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
2627
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
2728
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
@@ -136,6 +137,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
136137
Mounter: mounter,
137138
DeviceUtils: deviceUtils,
138139
MetadataService: meta,
140+
volumeLocks: common.NewVolumeLocks(),
139141
}
140142
}
141143

@@ -144,6 +146,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, met
144146
Driver: gceDriver,
145147
CloudProvider: cloudProvider,
146148
MetadataService: meta,
149+
volumeLocks: common.NewVolumeLocks(),
147150
}
148151
}
149152

pkg/gce-pd-csi-driver/gce-pd-driver_test.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,29 @@ import (
2222
)
2323

2424
func initGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk) *GCEDriver {
25-
vendorVersion := "test-vendor"
26-
gceDriver := GetGCEDriver()
27-
fakeCloudProvider, err := gce.FakeCreateCloudProvider(project, zone, cloudDisks)
25+
fakeCloudProvider, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks)
26+
if err != nil {
27+
t.Fatalf("Failed to create fake cloud provider: %v", err)
28+
}
29+
return initGCEDriverWithCloudProvider(t, fakeCloudProvider)
30+
}
31+
32+
func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, readyToExecute chan chan struct{}) *GCEDriver {
33+
fakeCloudProvider, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks)
2834
if err != nil {
2935
t.Fatalf("Failed to create fake cloud provider: %v", err)
3036
}
31-
err = gceDriver.SetupGCEDriver(fakeCloudProvider, nil, nil, metadataservice.NewFakeService(), driver, vendorVersion)
37+
fakeBlockingBlockProvider := &gce.FakeBlockingCloudProvider{
38+
FakeCloudProvider: fakeCloudProvider,
39+
ReadyToExecute: readyToExecute,
40+
}
41+
return initGCEDriverWithCloudProvider(t, fakeBlockingBlockProvider)
42+
}
43+
44+
func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {
45+
vendorVersion := "test-vendor"
46+
gceDriver := GetGCEDriver()
47+
err := gceDriver.SetupGCEDriver(cloudProvider, nil, nil, metadataservice.NewFakeService(), driver, vendorVersion)
3248
if err != nil {
3349
t.Fatalf("Failed to setup GCE Driver: %v", err)
3450
}

0 commit comments

Comments
 (0)