Skip to content

Commit 28d3405

Browse files
author
Hantao (Will) Wang
committed
add volume level seralization for controller operations
1 parent 64fecda commit 28d3405

File tree

4 files changed

+160
-4
lines changed

4 files changed

+160
-4
lines changed

pkg/gce-cloud-provider/compute/fake-gce.go

+36-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ func FakeCreateCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa
6161
fcp.disks[d.GetName()] = d
6262
}
6363
return fcp, nil
64-
6564
}
6665

6766
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {
@@ -422,6 +421,42 @@ func (cloud *FakeCloudProvider) getGlobalSnapshotURI(snapshotName string) string
422421
snapshotName)
423422
}
424423

424+
type FakeBlockingCloudProvider struct {
425+
*FakeCloudProvider
426+
snapshotToExec chan SnapshotSourceAndTarget
427+
readyToExecute chan struct{}
428+
}
429+
430+
type SnapshotSourceAndTarget struct {
431+
Source string
432+
Target string
433+
}
434+
435+
// FakeBlockingCloudProvider's method adds two channels to the CreateSnapshot process in order to provide functionality to finely
436+
// control the order of execution of volume operations. readyToExecute signals that an CreateSnapshot operation has been called.
437+
// Then it cycles through the snapshotToExec channel, waiting for permission to actually create the snapshot.
438+
func (cloud *FakeBlockingCloudProvider) CreateSnapshot(ctx context.Context, volKey *meta.Key, snapshotName string) (*compute.Snapshot, error) {
439+
cloud.readyToExecute <- struct{}{}
440+
for snapshotToExec := range cloud.snapshotToExec {
441+
if snapshotToExec.Source == volKey.String() && snapshotToExec.Target == snapshotName {
442+
break
443+
} else {
444+
cloud.snapshotToExec <- snapshotToExec
445+
}
446+
}
447+
return cloud.FakeCloudProvider.CreateSnapshot(ctx, volKey, snapshotName)
448+
}
449+
450+
func FakeCreateBlockingCloudProvider(project, zone string, cloudDisks []*CloudDisk, snapshotToExec chan SnapshotSourceAndTarget, readyToExecute chan struct{}) (*FakeBlockingCloudProvider, error) {
451+
fcp, err := FakeCreateCloudProvider(project, zone, cloudDisks)
452+
fbcp := &FakeBlockingCloudProvider{
453+
FakeCloudProvider: fcp,
454+
snapshotToExec: snapshotToExec,
455+
readyToExecute: readyToExecute,
456+
}
457+
return fbcp, err
458+
}
459+
425460
func notFoundError() *googleapi.Error {
426461
return &googleapi.Error{
427462
Errors: []googleapi.ErrorItem{

pkg/gce-pd-csi-driver/controller.go

+36
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"math/rand"
2020
"sort"
2121
"strings"
22+
"sync"
2223
"time"
2324

2425
"github.com/golang/protobuf/ptypes"
@@ -42,6 +43,10 @@ type GCEControllerServer struct {
4243
Driver *GCEDriver
4344
CloudProvider gce.GCECompute
4445
MetadataService metadataservice.MetadataService
46+
47+
// A map storing all volumes with ongoing operations so that additional operations
48+
// for that same volume (as defined by Volume Key) return an Aborted error
49+
volumes sync.Map
4550
}
4651

4752
var _ csi.ControllerServer = &GCEControllerServer{}
@@ -139,6 +144,11 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
139144
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume replication type '%s' is not supported", replicationType))
140145
}
141146

147+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
148+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
149+
}
150+
defer gceCS.volumes.Delete(volKey.String())
151+
142152
// Validate if disk already exists
143153
existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, volKey)
144154
if err != nil {
@@ -222,6 +232,11 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
222232
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
223233
}
224234

235+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
236+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
237+
}
238+
defer gceCS.volumes.Delete(volKey.String())
239+
225240
err = gceCS.CloudProvider.DeleteDisk(ctx, volKey)
226241
if err != nil {
227242
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown Delete disk error: %v", err))
@@ -258,6 +273,11 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
258273
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
259274
}
260275

276+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
277+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
278+
}
279+
defer gceCS.volumes.Delete(volKey.String())
280+
261281
// TODO(#253): Check volume capability matches for ALREADY_EXISTS
262282
if err = validateVolumeCapability(volumeCapability); err != nil {
263283
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapabilities is invalid: %v", err))
@@ -343,6 +363,11 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
343363
return nil, err
344364
}
345365

366+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
367+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
368+
}
369+
defer gceCS.volumes.Delete(volKey.String())
370+
346371
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
347372
if err != nil {
348373
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("could not split nodeID: %v", err))
@@ -389,6 +414,12 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
389414
if err != nil {
390415
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume ID is of improper format, got %v", volumeID))
391416
}
417+
418+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
419+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
420+
}
421+
defer gceCS.volumes.Delete(volKey.String())
422+
392423
_, err = gceCS.CloudProvider.GetDisk(ctx, volKey)
393424
if err != nil {
394425
if gce.IsGCEError(err, "notFound") {
@@ -496,6 +527,11 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C
496527
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
497528
}
498529

530+
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
531+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
532+
}
533+
defer gceCS.volumes.Delete(volKey.String())
534+
499535
// Check if snapshot already exists
500536
var snapshot *compute.Snapshot
501537
snapshot, err = gceCS.CloudProvider.GetSnapshot(ctx, req.Name)

pkg/gce-pd-csi-driver/controller_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -1413,3 +1413,76 @@ func TestPickRandAndConsecutive(t *testing.T) {
14131413

14141414
}
14151415
}
1416+
1417+
func TestVolumeOperationConcurrency(t *testing.T) {
1418+
snapshotToExecute := make(chan gce.SnapshotSourceAndTarget, 3)
1419+
readyToExecute := make(chan struct{}, 2)
1420+
reqFinished := make(chan error, 2)
1421+
1422+
gceDriver := initBlockingGCEDriver(t, nil, snapshotToExecute, readyToExecute)
1423+
cs := gceDriver.cs
1424+
1425+
vol1CreateSnapshotReqA := &csi.CreateSnapshotRequest{
1426+
Name: name + "1A",
1427+
SourceVolumeId: testVolumeId + "1",
1428+
}
1429+
vol1CreateSnapshotReqB := &csi.CreateSnapshotRequest{
1430+
Name: name + "1B",
1431+
SourceVolumeId: testVolumeId + "1",
1432+
}
1433+
vol2CreateSnapshotReq := &csi.CreateSnapshotRequest{
1434+
Name: name + "2",
1435+
SourceVolumeId: testVolumeId + "2",
1436+
}
1437+
1438+
runRequestInBackground := func(req *csi.CreateSnapshotRequest) {
1439+
_, err := cs.CreateSnapshot(context.Background(), req)
1440+
reqFinished <- err
1441+
}
1442+
1443+
sourceAndTargetForRequest := func(req *csi.CreateSnapshotRequest) gce.SnapshotSourceAndTarget {
1444+
volKey, err := common.VolumeIDToKey(req.SourceVolumeId)
1445+
if err != nil {
1446+
t.Fatalf("Could not find volume with ID %v: %v", req.SourceVolumeId, err)
1447+
}
1448+
return gce.SnapshotSourceAndTarget{
1449+
Source: volKey.String(),
1450+
Target: req.Name,
1451+
}
1452+
}
1453+
1454+
// Start first valid request vol1CreateSnapshotReqA and block until it reaches the CreateSnapshot
1455+
go runRequestInBackground(vol1CreateSnapshotReqA)
1456+
<-readyToExecute
1457+
1458+
// Check that vol1CreateSnapshotReqB is rejected, due to same volume ID
1459+
// Also allow vol1CreateSnapshotReqB to complete, in case it is allowed to Mount
1460+
snapshotToExecute <- sourceAndTargetForRequest(vol1CreateSnapshotReqB)
1461+
_, err := cs.CreateSnapshot(context.Background(), vol1CreateSnapshotReqB)
1462+
if err != nil {
1463+
serverError, ok := status.FromError(err)
1464+
if !ok {
1465+
t.Fatalf("Could not get error status code from err: %v", err)
1466+
}
1467+
if serverError.Code() != codes.Aborted {
1468+
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
1469+
}
1470+
} else {
1471+
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
1472+
}
1473+
1474+
// Start second valid request vol2CreateSnapshotReq
1475+
go runRequestInBackground(vol2CreateSnapshotReq)
1476+
1477+
// Allow the vol2CreateSnapshotReq to complete, which it can concurrently with vol1CreateSnapshotReqA
1478+
snapshotToExecute <- sourceAndTargetForRequest(vol2CreateSnapshotReq)
1479+
if err = <-reqFinished; err != nil {
1480+
t.Fatalf("Unexpected error: %v", err)
1481+
}
1482+
1483+
// To clean up, allow the vol1CreateSnapshotReqA to complete
1484+
snapshotToExecute <- sourceAndTargetForRequest(vol1CreateSnapshotReqA)
1485+
if err = <-reqFinished; err != nil {
1486+
t.Fatalf("Unexpected error: %v", err)
1487+
}
1488+
}

pkg/gce-pd-csi-driver/gce-pd-driver_test.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,25 @@ import (
2222
)
2323

2424
func initGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk) *GCEDriver {
25-
vendorVersion := "test-vendor"
26-
gceDriver := GetGCEDriver()
2725
fakeCloudProvider, err := gce.FakeCreateCloudProvider(project, zone, cloudDisks)
2826
if err != nil {
2927
t.Fatalf("Failed to create fake cloud provider: %v", err)
3028
}
31-
err = gceDriver.SetupGCEDriver(fakeCloudProvider, nil, nil, metadataservice.NewFakeService(), driver, vendorVersion)
29+
return initGCEDriverWithCloudProvider(t, fakeCloudProvider)
30+
}
31+
32+
func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, snapshotToExec chan gce.SnapshotSourceAndTarget, readyToExecute chan struct{}) *GCEDriver {
33+
fakeCloudProvider, err := gce.FakeCreateBlockingCloudProvider(project, zone, cloudDisks, snapshotToExec, readyToExecute)
34+
if err != nil {
35+
t.Fatalf("Failed to create fake cloud provider: %v", err)
36+
}
37+
return initGCEDriverWithCloudProvider(t, fakeCloudProvider)
38+
}
39+
40+
func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {
41+
vendorVersion := "test-vendor"
42+
gceDriver := GetGCEDriver()
43+
err := gceDriver.SetupGCEDriver(cloudProvider, nil, nil, metadataservice.NewFakeService(), driver, vendorVersion)
3244
if err != nil {
3345
t.Fatalf("Failed to setup GCE Driver: %v", err)
3446
}

0 commit comments

Comments
 (0)