Skip to content

Commit 896ea45

Browse files
author
Hantao (Will) Wang
committed
add volume level seralization for controller operations
1 parent 16f5e99 commit 896ea45

File tree

4 files changed

+160
-4
lines changed

4 files changed

+160
-4
lines changed

pkg/gce-cloud-provider/compute/fake-gce.go

+36-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ func FakeCreateCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa
6161
fcp.disks[d.GetName()] = d
6262
}
6363
return fcp, nil
64-
6564
}
6665

6766
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {
@@ -422,6 +421,42 @@ func (cloud *FakeCloudProvider) getGlobalSnapshotURI(snapshotName string) string
422421
snapshotName)
423422
}
424423

424+
type FakeBlockingCloudProvider struct {
425+
*FakeCloudProvider
426+
snapshotToExec chan SnapshotSourceAndTarget
427+
readyToExecute chan struct{}
428+
}
429+
430+
type SnapshotSourceAndTarget struct {
431+
Source string
432+
Target string
433+
}
434+
435+
// FakeBlockingCloudProvider's method adds two channels to the CreateSnapshot process in order to provide functionality to finely
436+
// control the order of execution of volume operations. readyToExecute signals that an CreateSnapshot operation has been called.
437+
// Then it cycles through the snapshotToExec channel, waiting for permission to actually create the snapshot.
438+
func (cloud *FakeBlockingCloudProvider) CreateSnapshot(ctx context.Context, volKey *meta.Key, snapshotName string) (*compute.Snapshot, error) {
439+
cloud.readyToExecute <- struct{}{}
440+
for snapshotToExec := range cloud.snapshotToExec {
441+
if snapshotToExec.Source == volKey.String() && snapshotToExec.Target == snapshotName {
442+
break
443+
} else {
444+
cloud.snapshotToExec <- snapshotToExec
445+
}
446+
}
447+
return cloud.FakeCloudProvider.CreateSnapshot(ctx, volKey, snapshotName)
448+
}
449+
450+
func FakeCreateBlockingCloudProvider(project, zone string, cloudDisks []*CloudDisk, snapshotToExec chan SnapshotSourceAndTarget, readyToExecute chan struct{}) (*FakeBlockingCloudProvider, error) {
451+
fcp, err := FakeCreateCloudProvider(project, zone, cloudDisks)
452+
fbcp := &FakeBlockingCloudProvider{
453+
FakeCloudProvider: fcp,
454+
snapshotToExec: snapshotToExec,
455+
readyToExecute: readyToExecute,
456+
}
457+
return fbcp, err
458+
}
459+
425460
func notFoundError() *googleapi.Error {
426461
return &googleapi.Error{
427462
Errors: []googleapi.ErrorItem{

pkg/gce-pd-csi-driver/controller.go

+36
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"math/rand"
2020
"sort"
2121
"strings"
22+
"sync"
2223
"time"
2324

2425
"github.com/golang/protobuf/ptypes"
@@ -41,6 +42,10 @@ type GCEControllerServer struct {
4142
Driver *GCEDriver
4243
CloudProvider gce.GCECompute
4344
MetadataService metadataservice.MetadataService
45+
46+
// A map storing all volumes with ongoing operations so that additional operations
47+
// for that same volume (as defined by Volume Key) return an Aborted error
48+
volumes sync.Map
4449
}
4550

4651
var _ csi.ControllerServer = &GCEControllerServer{}
@@ -138,6 +143,11 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
138143
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume replication type '%s' is not supported", replicationType))
139144
}
140145

146+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
147+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
148+
}
149+
defer gceCS.volumes.Delete(volKey.String())
150+
141151
// Validate if disk already exists
142152
existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, volKey)
143153
if err != nil {
@@ -213,6 +223,11 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
213223
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
214224
}
215225

226+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
227+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
228+
}
229+
defer gceCS.volumes.Delete(volKey.String())
230+
216231
err = gceCS.CloudProvider.DeleteDisk(ctx, volKey)
217232
if err != nil {
218233
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown Delete disk error: %v", err))
@@ -249,6 +264,11 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
249264
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
250265
}
251266

267+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
268+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
269+
}
270+
defer gceCS.volumes.Delete(volKey.String())
271+
252272
// TODO(#253): Check volume capability matches for ALREADY_EXISTS
253273
if err = validateVolumeCapability(volumeCapability); err != nil {
254274
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapabilities is invalid: %v", err))
@@ -334,6 +354,11 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
334354
return nil, err
335355
}
336356

357+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
358+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
359+
}
360+
defer gceCS.volumes.Delete(volKey.String())
361+
337362
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
338363
if err != nil {
339364
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("could not split nodeID: %v", err))
@@ -380,6 +405,12 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
380405
if err != nil {
381406
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume ID is of improper format, got %v", volumeID))
382407
}
408+
409+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
410+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
411+
}
412+
defer gceCS.volumes.Delete(volKey.String())
413+
383414
_, err = gceCS.CloudProvider.GetDisk(ctx, volKey)
384415
if err != nil {
385416
if gce.IsGCEError(err, "notFound") {
@@ -487,6 +518,11 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C
487518
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
488519
}
489520

521+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
522+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
523+
}
524+
defer gceCS.volumes.Delete(volKey.String())
525+
490526
// Check if snapshot already exists
491527
var snapshot *compute.Snapshot
492528
snapshot, err = gceCS.CloudProvider.GetSnapshot(ctx, req.Name)

pkg/gce-pd-csi-driver/controller_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -1366,3 +1366,76 @@ func TestPickRandAndConsecutive(t *testing.T) {
13661366

13671367
}
13681368
}
1369+
1370+
func TestVolumeOperationConcurrency(t *testing.T) {
1371+
snapshotToExecute := make(chan gce.SnapshotSourceAndTarget, 3)
1372+
readyToExecute := make(chan struct{}, 2)
1373+
reqFinished := make(chan error, 2)
1374+
1375+
gceDriver := initBlockingGCEDriver(t, nil, snapshotToExecute, readyToExecute)
1376+
cs := gceDriver.cs
1377+
1378+
vol1CreateSnapshotReqA := &csi.CreateSnapshotRequest{
1379+
Name: name + "1A",
1380+
SourceVolumeId: testVolumeId + "1",
1381+
}
1382+
vol1CreateSnapshotReqB := &csi.CreateSnapshotRequest{
1383+
Name: name + "1B",
1384+
SourceVolumeId: testVolumeId + "1",
1385+
}
1386+
vol2CreateSnapshotReq := &csi.CreateSnapshotRequest{
1387+
Name: name + "2",
1388+
SourceVolumeId: testVolumeId + "2",
1389+
}
1390+
1391+
runRequestInBackground := func(req *csi.CreateSnapshotRequest) {
1392+
_, err := cs.CreateSnapshot(context.Background(), req)
1393+
reqFinished <- err
1394+
}
1395+
1396+
sourceAndTargetForRequest := func(req *csi.CreateSnapshotRequest) gce.SnapshotSourceAndTarget {
1397+
volKey, err := common.VolumeIDToKey(req.SourceVolumeId)
1398+
if err != nil {
1399+
t.Fatalf("Could not find volume with ID %v: %v", req.SourceVolumeId, err)
1400+
}
1401+
return gce.SnapshotSourceAndTarget{
1402+
Source: volKey.String(),
1403+
Target: req.Name,
1404+
}
1405+
}
1406+
1407+
// Start first valid request vol1CreateSnapshotReqA and block until it reaches the CreateSnapshot
1408+
go runRequestInBackground(vol1CreateSnapshotReqA)
1409+
<-readyToExecute
1410+
1411+
// Check that vol1CreateSnapshotReqB is rejected, due to same volume ID
1412+
// Also allow vol1CreateSnapshotReqB to complete, in case it is allowed to Mount
1413+
snapshotToExecute <- sourceAndTargetForRequest(vol1CreateSnapshotReqB)
1414+
_, err := cs.CreateSnapshot(context.Background(), vol1CreateSnapshotReqB)
1415+
if err != nil {
1416+
serverError, ok := status.FromError(err)
1417+
if !ok {
1418+
t.Fatalf("Could not get error status code from err: %v", err)
1419+
}
1420+
if serverError.Code() != codes.Aborted {
1421+
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
1422+
}
1423+
} else {
1424+
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
1425+
}
1426+
1427+
// Start second valid request vol2CreateSnapshotReq
1428+
go runRequestInBackground(vol2CreateSnapshotReq)
1429+
1430+
// Allow the vol2CreateSnapshotReq to complete, which it can concurrently with vol1CreateSnapshotReqA
1431+
snapshotToExecute <- sourceAndTargetForRequest(vol2CreateSnapshotReq)
1432+
if err = <-reqFinished; err != nil {
1433+
t.Fatalf("Unexpected error: %v", err)
1434+
}
1435+
1436+
// To clean up, allow the vol1CreateSnapshotReqA to complete
1437+
snapshotToExecute <- sourceAndTargetForRequest(vol1CreateSnapshotReqA)
1438+
if err = <-reqFinished; err != nil {
1439+
t.Fatalf("Unexpected error: %v", err)
1440+
}
1441+
}

pkg/gce-pd-csi-driver/gce-pd-driver_test.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,25 @@ import (
2222
)
2323

2424
func initGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk) *GCEDriver {
25-
vendorVersion := "test-vendor"
26-
gceDriver := GetGCEDriver()
2725
fakeCloudProvider, err := gce.FakeCreateCloudProvider(project, zone, cloudDisks)
2826
if err != nil {
2927
t.Fatalf("Failed to create fake cloud provider: %v", err)
3028
}
31-
err = gceDriver.SetupGCEDriver(fakeCloudProvider, nil, nil, metadataservice.NewFakeService(), driver, vendorVersion)
29+
return initGCEDriverWithCloudProvider(t, fakeCloudProvider)
30+
}
31+
32+
func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, snapshotToExec chan gce.SnapshotSourceAndTarget, readyToExecute chan struct{}) *GCEDriver {
33+
fakeCloudProvider, err := gce.FakeCreateBlockingCloudProvider(project, zone, cloudDisks, snapshotToExec, readyToExecute)
34+
if err != nil {
35+
t.Fatalf("Failed to create fake cloud provider: %v", err)
36+
}
37+
return initGCEDriverWithCloudProvider(t, fakeCloudProvider)
38+
}
39+
40+
func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {
41+
vendorVersion := "test-vendor"
42+
gceDriver := GetGCEDriver()
43+
err := gceDriver.SetupGCEDriver(cloudProvider, nil, nil, metadataservice.NewFakeService(), driver, vendorVersion)
3244
if err != nil {
3345
t.Fatalf("Failed to setup GCE Driver: %v", err)
3446
}

0 commit comments

Comments
 (0)