Skip to content

Commit d61ff34

Browse files
author
Hantao (Will) Wang
committed
use chan of chan for testing instead of SourceAndTarget
1 parent a8fb9fb commit d61ff34

File tree

5 files changed

+110
-146
lines changed

5 files changed

+110
-146
lines changed

pkg/gce-cloud-provider/compute/fake-gce.go

+9-29
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type FakeCloudProvider struct {
4949

5050
var _ GCECompute = &FakeCloudProvider{}
5151

52-
func FakeCreateCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*FakeCloudProvider, error) {
52+
func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*FakeCloudProvider, error) {
5353
fcp := &FakeCloudProvider{
5454
project: project,
5555
zone: zone,
@@ -427,40 +427,20 @@ func (cloud *FakeCloudProvider) getGlobalSnapshotURI(snapshotName string) string
427427

428428
type FakeBlockingCloudProvider struct {
429429
*FakeCloudProvider
430-
snapshotToExec chan SnapshotSourceAndTarget
431-
readyToExecute chan struct{}
430+
ReadyToExecute chan chan struct{}
432431
}
433432

434-
type SnapshotSourceAndTarget struct {
435-
Source string
436-
Target string
437-
}
438-
439-
// FakeBlockingCloudProvider's method adds two channels to the CreateSnapshot process in order to provide functionality to finely
440-
// control the order of execution of volume operations. readyToExecute signals that an CreateSnapshot operation has been called.
441-
// Then it cycles through the snapshotToExec channel, waiting for permission to actually create the snapshot.
433+
// FakeBlockingCloudProvider's method adds functionality to finely control the order of execution of CreateSnapshot calls.
434+
// Upon starting a CreateSnapshot, it passes a chan 'executeCreateSnapshot' into readyToExecute, then blocks on executeCreateSnapshot.
435+
// The test calling this function can block on readyToExecute to ensure that the operation has started and
436+
// allowed the CreateSnapshot to continue by passing a struct into executeCreateSnapshot.
442437
func (cloud *FakeBlockingCloudProvider) CreateSnapshot(ctx context.Context, volKey *meta.Key, snapshotName string) (*compute.Snapshot, error) {
443-
cloud.readyToExecute <- struct{}{}
444-
for snapshotToExec := range cloud.snapshotToExec {
445-
if snapshotToExec.Source == volKey.String() && snapshotToExec.Target == snapshotName {
446-
break
447-
} else {
448-
cloud.snapshotToExec <- snapshotToExec
449-
}
450-
}
438+
executeCreateSnapshot := make(chan struct{})
439+
cloud.ReadyToExecute <- executeCreateSnapshot
440+
<-executeCreateSnapshot
451441
return cloud.FakeCloudProvider.CreateSnapshot(ctx, volKey, snapshotName)
452442
}
453443

454-
func FakeCreateBlockingCloudProvider(project, zone string, cloudDisks []*CloudDisk, snapshotToExec chan SnapshotSourceAndTarget, readyToExecute chan struct{}) (*FakeBlockingCloudProvider, error) {
455-
fcp, err := FakeCreateCloudProvider(project, zone, cloudDisks)
456-
fbcp := &FakeBlockingCloudProvider{
457-
FakeCloudProvider: fcp,
458-
snapshotToExec: snapshotToExec,
459-
readyToExecute: readyToExecute,
460-
}
461-
return fbcp, err
462-
}
463-
464444
func notFoundError() *googleapi.Error {
465445
return &googleapi.Error{
466446
Errors: []googleapi.ErrorItem{

pkg/gce-pd-csi-driver/controller_test.go

+40-46
Original file line numberDiff line numberDiff line change
@@ -1415,18 +1415,15 @@ func TestPickRandAndConsecutive(t *testing.T) {
14151415
}
14161416

14171417
func TestVolumeOperationConcurrency(t *testing.T) {
1418-
snapshotToExecute := make(chan gce.SnapshotSourceAndTarget, 3)
1419-
readyToExecute := make(chan struct{}, 2)
1420-
reqFinished := make(chan error, 2)
1421-
1422-
gceDriver := initBlockingGCEDriver(t, nil, snapshotToExecute, readyToExecute)
1418+
readyToExecute := make(chan chan struct{}, 1)
1419+
gceDriver := initBlockingGCEDriver(t, nil, readyToExecute)
14231420
cs := gceDriver.cs
14241421

1425-
vol1CreateSnapshotReqA := &csi.CreateSnapshotRequest{
1422+
vol1CreateSnapshotAReq := &csi.CreateSnapshotRequest{
14261423
Name: name + "1A",
14271424
SourceVolumeId: testVolumeId + "1",
14281425
}
1429-
vol1CreateSnapshotReqB := &csi.CreateSnapshotRequest{
1426+
vol1CreateSnapshotBReq := &csi.CreateSnapshotRequest{
14301427
Name: name + "1B",
14311428
SourceVolumeId: testVolumeId + "1",
14321429
}
@@ -1435,54 +1432,51 @@ func TestVolumeOperationConcurrency(t *testing.T) {
14351432
SourceVolumeId: testVolumeId + "2",
14361433
}
14371434

1438-
runRequestInBackground := func(req *csi.CreateSnapshotRequest) {
1439-
_, err := cs.CreateSnapshot(context.Background(), req)
1440-
reqFinished <- err
1441-
}
1442-
1443-
sourceAndTargetForRequest := func(req *csi.CreateSnapshotRequest) gce.SnapshotSourceAndTarget {
1444-
volKey, err := common.VolumeIDToKey(req.SourceVolumeId)
1445-
if err != nil {
1446-
t.Fatalf("Could not find volume with ID %v: %v", req.SourceVolumeId, err)
1447-
}
1448-
return gce.SnapshotSourceAndTarget{
1449-
Source: volKey.String(),
1450-
Target: req.Name,
1451-
}
1435+
runRequest := func(req *csi.CreateSnapshotRequest) chan error {
1436+
response := make(chan error)
1437+
go func() {
1438+
_, err := cs.CreateSnapshot(context.Background(), req)
1439+
response <- err
1440+
}()
1441+
return response
14521442
}
14531443

1454-
// Start first valid request vol1CreateSnapshotReqA and block until it reaches the CreateSnapshot
1455-
go runRequestInBackground(vol1CreateSnapshotReqA)
1456-
<-readyToExecute
1444+
// Start first valid request vol1CreateSnapshotA and block until it reaches the CreateSnapshot
1445+
vol1CreateSnapshotAResp := runRequest(vol1CreateSnapshotAReq)
1446+
execVol1CreateSnapshotA := <-readyToExecute
14571447

1458-
// Check that vol1CreateSnapshotReqB is rejected, due to same volume ID
1459-
// Also allow vol1CreateSnapshotReqB to complete, in case it is allowed to Mount
1460-
snapshotToExecute <- sourceAndTargetForRequest(vol1CreateSnapshotReqB)
1461-
_, err := cs.CreateSnapshot(context.Background(), vol1CreateSnapshotReqB)
1462-
if err != nil {
1463-
serverError, ok := status.FromError(err)
1464-
if !ok {
1465-
t.Fatalf("Could not get error status code from err: %v", err)
1466-
}
1467-
if serverError.Code() != codes.Aborted {
1468-
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
1448+
// Start vol1CreateSnapshotB and allow it to execute to completion. Then check for Aborted error.
1449+
// If a non Abort error is received or if the operation was started, then there is a problem
1450+
// with volume locking
1451+
vol1CreateSnapshotBResp := runRequest(vol1CreateSnapshotBReq)
1452+
select {
1453+
case err := <-vol1CreateSnapshotBResp:
1454+
if err != nil {
1455+
serverError, ok := status.FromError(err)
1456+
if !ok {
1457+
t.Fatalf("Could not get error status code from err: %v", err)
1458+
}
1459+
if serverError.Code() != codes.Aborted {
1460+
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
1461+
}
1462+
} else {
1463+
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
14691464
}
1470-
} else {
1471-
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
1465+
case <-readyToExecute:
1466+
t.Fatal("The operation for vol1PublishTargetB should have been aborted, but was started")
14721467
}
14731468

1474-
// Start second valid request vol2CreateSnapshotReq
1475-
go runRequestInBackground(vol2CreateSnapshotReq)
1476-
1477-
// Allow the vol2CreateSnapshotReq to complete, which it can concurrently with vol1CreateSnapshotReqA
1478-
snapshotToExecute <- sourceAndTargetForRequest(vol2CreateSnapshotReq)
1479-
if err = <-reqFinished; err != nil {
1469+
// Start vol2CreateSnapshot and allow it to execute to completion. Then check for success.
1470+
vol2CreateSnapshotResp := runRequest(vol2CreateSnapshotReq)
1471+
execVol2CreateSnapshot := <-readyToExecute
1472+
execVol2CreateSnapshot <- struct{}{}
1473+
if err := <-vol2CreateSnapshotResp; err != nil {
14801474
t.Fatalf("Unexpected error: %v", err)
14811475
}
14821476

1483-
// To clean up, allow the vol1CreateSnapshotReqA to complete
1484-
snapshotToExecute <- sourceAndTargetForRequest(vol1CreateSnapshotReqA)
1485-
if err = <-reqFinished; err != nil {
1477+
// To clean up, allow the vol1PublishTargetA to complete
1478+
execVol1CreateSnapshotA <- struct{}{}
1479+
if err := <-vol1CreateSnapshotAResp; err != nil {
14861480
t.Fatalf("Unexpected error: %v", err)
14871481
}
14881482
}

pkg/gce-pd-csi-driver/gce-pd-driver_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,23 @@ import (
2222
)
2323

2424
func initGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk) *GCEDriver {
25-
fakeCloudProvider, err := gce.FakeCreateCloudProvider(project, zone, cloudDisks)
25+
fakeCloudProvider, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks)
2626
if err != nil {
2727
t.Fatalf("Failed to create fake cloud provider: %v", err)
2828
}
2929
return initGCEDriverWithCloudProvider(t, fakeCloudProvider)
3030
}
3131

32-
func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, snapshotToExec chan gce.SnapshotSourceAndTarget, readyToExecute chan struct{}) *GCEDriver {
33-
fakeCloudProvider, err := gce.FakeCreateBlockingCloudProvider(project, zone, cloudDisks, snapshotToExec, readyToExecute)
32+
func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, readyToExecute chan chan struct{}) *GCEDriver {
33+
fakeCloudProvider, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks)
3434
if err != nil {
3535
t.Fatalf("Failed to create fake cloud provider: %v", err)
3636
}
37-
return initGCEDriverWithCloudProvider(t, fakeCloudProvider)
37+
fakeBlockingBlockProvider := &gce.FakeBlockingCloudProvider{
38+
FakeCloudProvider: fakeCloudProvider,
39+
ReadyToExecute: readyToExecute,
40+
}
41+
return initGCEDriverWithCloudProvider(t, fakeBlockingBlockProvider)
3842
}
3943

4044
func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {

pkg/gce-pd-csi-driver/node_test.go

+42-45
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ limitations under the License.
1515
package gceGCEDriver
1616

1717
import (
18+
"context"
1819
"testing"
1920

20-
"context"
2121
csi "github.com/container-storage-interface/spec/lib/go/csi"
2222
"google.golang.org/grpc/codes"
2323
"google.golang.org/grpc/status"
@@ -38,9 +38,9 @@ func getTestGCEDriver(t *testing.T) *GCEDriver {
3838
return gceDriver
3939
}
4040

41-
func getTestBlockingGCEDriver(t *testing.T, mountToRun chan mountmanager.MountSourceAndTarget, readyToMount chan struct{}) *GCEDriver {
41+
func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
4242
gceDriver := GetGCEDriver()
43-
err := gceDriver.SetupGCEDriver(nil, mountmanager.NewFakeSafeBlockingMounter(mountToRun, readyToMount), mountmanager.NewFakeDeviceUtils(), metadataservice.NewFakeService(), driver, "test-vendor")
43+
err := gceDriver.SetupGCEDriver(nil, mountmanager.NewFakeSafeBlockingMounter(readyToExecute), mountmanager.NewFakeDeviceUtils(), metadataservice.NewFakeService(), driver, "test-vendor")
4444
if err != nil {
4545
t.Fatalf("Failed to setup GCE Driver: %v", err)
4646
}
@@ -389,12 +389,10 @@ func TestNodeGetCapabilities(t *testing.T) {
389389
}
390390

391391
func TestConcurrentNodeOperations(t *testing.T) {
392-
mountToRun := make(chan mountmanager.MountSourceAndTarget, 3)
393-
readyToMount := make(chan struct{}, 2)
394-
reqFinished := make(chan error, 2)
395-
396-
gceDriver := getTestBlockingGCEDriver(t, mountToRun, readyToMount)
392+
readyToExecute := make(chan chan struct{}, 1)
393+
gceDriver := getTestBlockingGCEDriver(t, readyToExecute)
397394
ns := gceDriver.ns
395+
398396
vol1PublishTargetAReq := &csi.NodePublishVolumeRequest{
399397
VolumeId: defaultVolumeID + "1",
400398
TargetPath: defaultTargetPath + "a",
@@ -417,52 +415,51 @@ func TestConcurrentNodeOperations(t *testing.T) {
417415
VolumeCapability: stdVolCap,
418416
}
419417

420-
runRequestInBackground := func(req *csi.NodePublishVolumeRequest) {
421-
_, err := ns.NodePublishVolume(context.Background(), req)
422-
reqFinished <- err
418+
runRequest := func(req *csi.NodePublishVolumeRequest) chan error {
419+
response := make(chan error)
420+
go func() {
421+
_, err := ns.NodePublishVolume(context.Background(), req)
422+
response <- err
423+
}()
424+
return response
423425
}
424426

425-
// Start first valid request vol1PublishTargetAReq and block until it reaches the Mount
426-
go runRequestInBackground(vol1PublishTargetAReq)
427-
<-readyToMount
427+
// Start first valid request vol1PublishTargetA and block until it reaches the Mount
428+
vol1PublishTargetAResp := runRequest(vol1PublishTargetAReq)
429+
execVol1PublishTargetA := <-readyToExecute
428430

429-
// Check that vol1PublishTargetBReq is rejected, due to same volume ID
430-
// Also allow vol1PublishTargetBReq to complete, in case it is allowed to Mount
431-
mountToRun <- mountmanager.MountSourceAndTarget{
432-
Source: vol1PublishTargetBReq.StagingTargetPath,
433-
Target: vol1PublishTargetBReq.TargetPath,
434-
}
435-
_, err := ns.NodePublishVolume(context.Background(), vol1PublishTargetBReq)
436-
if err != nil {
437-
serverError, ok := status.FromError(err)
438-
if !ok {
439-
t.Fatalf("Could not get error status code from err: %v", err)
440-
}
441-
if serverError.Code() != codes.Aborted {
442-
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
431+
// Start vol1PublishTargetB and allow it to execute to completion. Then check for Aborted error.
432+
// If a non Abort error is received or if the operation was started, then there is a problem
433+
// with volume locking.
434+
vol1PublishTargetBResp := runRequest(vol1PublishTargetBReq)
435+
select {
436+
case err := <-vol1PublishTargetBResp:
437+
if err != nil {
438+
serverError, ok := status.FromError(err)
439+
if !ok {
440+
t.Fatalf("Could not get error status code from err: %v", err)
441+
}
442+
if serverError.Code() != codes.Aborted {
443+
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
444+
}
445+
} else {
446+
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
443447
}
444-
} else {
445-
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
448+
case <-readyToExecute:
449+
t.Fatal("The operation for vol1PublishTargetB should have been aborted, but was started")
446450
}
447451

448-
// Start second valid request vol2PublishTargetCReq
449-
go runRequestInBackground(vol2PublishTargetCReq)
450-
451-
// Allow the vol2PublishTargetCReq to complete, which it can concurrently with vol1PublishTargetAReq
452-
mountToRun <- mountmanager.MountSourceAndTarget{
453-
Source: vol2PublishTargetCReq.StagingTargetPath,
454-
Target: vol2PublishTargetCReq.TargetPath,
455-
}
456-
if err = <-reqFinished; err != nil {
452+
// Start vol2PublishTargetC and allow it to execute to completion. Then check for success.
453+
vol2PublishTargetCResp := runRequest(vol2PublishTargetCReq)
454+
execVol2PublishTargetC := <-readyToExecute
455+
execVol2PublishTargetC <- struct{}{}
456+
if err := <-vol2PublishTargetCResp; err != nil {
457457
t.Fatalf("Unexpected error: %v", err)
458458
}
459459

460-
// To clean up, allow the vol1PublishTargetAReq to complete
461-
mountToRun <- mountmanager.MountSourceAndTarget{
462-
Source: vol1PublishTargetAReq.StagingTargetPath,
463-
Target: vol1PublishTargetAReq.TargetPath,
464-
}
465-
if err = <-reqFinished; err != nil {
460+
// To clean up, allow the vol1PublishTargetA to complete
461+
execVol1PublishTargetA <- struct{}{}
462+
if err := <-vol1PublishTargetAResp; err != nil {
466463
t.Fatalf("Unexpected error: %v", err)
467464
}
468465
}

pkg/mount-manager/fake-safe-mounter.go

+11-22
Original file line numberDiff line numberDiff line change
@@ -52,35 +52,24 @@ func NewFakeSafeMounter() *mount.SafeFormatAndMount {
5252

5353
type FakeBlockingMounter struct {
5454
*mount.FakeMounter
55-
mountToRun chan MountSourceAndTarget
56-
readyToMount chan struct{}
55+
ReadyToExecute chan chan struct{}
5756
}
5857

59-
type MountSourceAndTarget struct {
60-
Source string
61-
Target string
62-
}
63-
64-
// FakeBlockingMounter's method adds two channels to the Mount process in order to provide functionality to finely
65-
// control the order of execution of Mount calls. readToMount signals that a Mount operation has been called.
66-
// Then it cycles through the mountToRun channel, waiting for permission to actually make the mount operation.
58+
// FakeBlockingMounter's method adds functionality to finely control the order of execution of Mount calls.
59+
// Upon starting a Mount, it passes a chan 'executeMount' into readyToExecute, then blocks on executeMount.
60+
// The test calling this function can block on readyToExecute to ensure that the operation has started and
61+
// allowed the Mount to continue by passing a struct into executeMount.
6762
func (mounter *FakeBlockingMounter) Mount(source string, target string, fstype string, options []string) error {
68-
mounter.readyToMount <- struct{}{}
69-
for mountToRun := range mounter.mountToRun {
70-
if mountToRun.Source == source && mountToRun.Target == target {
71-
break
72-
} else {
73-
mounter.mountToRun <- mountToRun
74-
}
75-
}
63+
executeMount := make(chan struct{})
64+
mounter.ReadyToExecute <- executeMount
65+
<-executeMount
7666
return mounter.FakeMounter.Mount(source, target, fstype, options)
7767
}
7868

79-
func NewFakeSafeBlockingMounter(mountToRun chan MountSourceAndTarget, readyToMount chan struct{}) *mount.SafeFormatAndMount {
69+
func NewFakeSafeBlockingMounter(readyToExecute chan chan struct{}) *mount.SafeFormatAndMount {
8070
fakeBlockingMounter := &FakeBlockingMounter{
81-
FakeMounter: fakeMounter,
82-
mountToRun: mountToRun,
83-
readyToMount: readyToMount,
71+
FakeMounter: fakeMounter,
72+
ReadyToExecute: readyToExecute,
8473
}
8574
return &mount.SafeFormatAndMount{
8675
Interface: fakeBlockingMounter,

0 commit comments

Comments
 (0)