Skip to content

Commit f53a6ab

Browse files
author
Hantao (Will) Wang
committed
Revert "Merge pull request kubernetes-sigs#316 from hantaowang/seralize-attach-ops"
This reverts commit 41f103f, reversing changes made to 965ba6a.
1 parent e59f47e commit f53a6ab

File tree

11 files changed

+91
-294
lines changed

11 files changed

+91
-294
lines changed

pkg/common/utils.go

-11
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,6 @@ func VolumeIDToKey(id string) (*meta.Key, error) {
7373
}
7474
}
7575

76-
func KeyToVolumeID(volKey *meta.Key, project string) (string, error) {
77-
switch volKey.Type() {
78-
case meta.Zonal:
79-
return fmt.Sprintf(volIDZonalFmt, project, volKey.Zone, volKey.Zone), nil
80-
case meta.Regional:
81-
return fmt.Sprintf(volIDZonalFmt, project, volKey.Region, volKey.Zone), nil
82-
default:
83-
return "", fmt.Errorf("volume key %v neither zonal nor regional", volKey.Name)
84-
}
85-
}
86-
8776
func GenerateUnderspecifiedVolumeID(diskName string, isZonal bool) string {
8877
if isZonal {
8978
return fmt.Sprintf(volIDZonalFmt, UnspecifiedValue, UnspecifiedValue, diskName)

pkg/common/volume_lock.go

-58
This file was deleted.

pkg/gce-cloud-provider/compute/fake-gce.go

+2-17
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type FakeCloudProvider struct {
4949

5050
var _ GCECompute = &FakeCloudProvider{}
5151

52-
func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*FakeCloudProvider, error) {
52+
func FakeCreateCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*FakeCloudProvider, error) {
5353
fcp := &FakeCloudProvider{
5454
project: project,
5555
zone: zone,
@@ -61,6 +61,7 @@ func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa
6161
fcp.disks[d.GetName()] = d
6262
}
6363
return fcp, nil
64+
6465
}
6566

6667
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {
@@ -433,22 +434,6 @@ func (cloud *FakeCloudProvider) getGlobalSnapshotURI(snapshotName string) string
433434
snapshotName)
434435
}
435436

436-
type FakeBlockingCloudProvider struct {
437-
*FakeCloudProvider
438-
ReadyToExecute chan chan struct{}
439-
}
440-
441-
// FakeBlockingCloudProvider's method adds functionality to finely control the order of execution of CreateSnapshot calls.
442-
// Upon starting a CreateSnapshot, it passes a chan 'executeCreateSnapshot' into readyToExecute, then blocks on executeCreateSnapshot.
443-
// The test calling this function can block on readyToExecute to ensure that the operation has started and
444-
// allowed the CreateSnapshot to continue by passing a struct into executeCreateSnapshot.
445-
func (cloud *FakeBlockingCloudProvider) CreateSnapshot(ctx context.Context, volKey *meta.Key, snapshotName string) (*compute.Snapshot, error) {
446-
executeCreateSnapshot := make(chan struct{})
447-
cloud.ReadyToExecute <- executeCreateSnapshot
448-
<-executeCreateSnapshot
449-
return cloud.FakeCloudProvider.CreateSnapshot(ctx, volKey, snapshotName)
450-
}
451-
452437
func notFoundError() *googleapi.Error {
453438
return &googleapi.Error{
454439
Errors: []googleapi.ErrorItem{

pkg/gce-pd-csi-driver/controller.go

-45
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ type GCEControllerServer struct {
4242
Driver *GCEDriver
4343
CloudProvider gce.GCECompute
4444
MetadataService metadataservice.MetadataService
45-
46-
// A map storing all volumes with ongoing operations so that additional operations
47-
// for that same volume (as defined by Volume Key) return an Aborted error
48-
volumeLocks *common.VolumeLocks
4945
}
5046

5147
var _ csi.ControllerServer = &GCEControllerServer{}
@@ -143,15 +139,6 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
143139
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume replication type '%s' is not supported", replicationType))
144140
}
145141

146-
volumeID, err := common.KeyToVolumeID(volKey, gceCS.MetadataService.GetProject())
147-
if err != nil {
148-
return nil, status.Errorf(codes.Internal, "Failed to convert volume key to volume ID: %v", err)
149-
}
150-
if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired {
151-
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
152-
}
153-
defer gceCS.volumeLocks.Release(volumeID)
154-
155142
// Validate if disk already exists
156143
existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, volKey)
157144
if err != nil {
@@ -235,11 +222,6 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
235222
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
236223
}
237224

238-
if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired {
239-
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
240-
}
241-
defer gceCS.volumeLocks.Release(volumeID)
242-
243225
err = gceCS.CloudProvider.DeleteDisk(ctx, volKey)
244226
if err != nil {
245227
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown Delete disk error: %v", err))
@@ -276,14 +258,6 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
276258
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
277259
}
278260

279-
// Acquires the lock for the volume on that node only, because we need to support the ability
280-
// to publish the same volume onto different nodes concurrently
281-
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
282-
if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired {
283-
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
284-
}
285-
defer gceCS.volumeLocks.Release(lockingVolumeID)
286-
287261
// TODO(#253): Check volume capability matches for ALREADY_EXISTS
288262
if err = validateVolumeCapability(volumeCapability); err != nil {
289263
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapabilities is invalid: %v", err))
@@ -369,14 +343,6 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
369343
return nil, err
370344
}
371345

372-
// Acquires the lock for the volume on that node only, because we need to support the ability
373-
// to unpublish the same volume from different nodes concurrently
374-
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
375-
if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired {
376-
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
377-
}
378-
defer gceCS.volumeLocks.Release(lockingVolumeID)
379-
380346
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
381347
if err != nil {
382348
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("could not split nodeID: %v", err))
@@ -423,12 +389,6 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
423389
if err != nil {
424390
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume ID is of improper format, got %v", volumeID))
425391
}
426-
427-
if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired {
428-
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
429-
}
430-
defer gceCS.volumeLocks.Release(volumeID)
431-
432392
_, err = gceCS.CloudProvider.GetDisk(ctx, volKey)
433393
if err != nil {
434394
if gce.IsGCEError(err, "notFound") {
@@ -536,11 +496,6 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C
536496
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
537497
}
538498

539-
if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired {
540-
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
541-
}
542-
defer gceCS.volumeLocks.Release(volumeID)
543-
544499
// Check if snapshot already exists
545500
var snapshot *compute.Snapshot
546501
snapshot, err = gceCS.CloudProvider.GetSnapshot(ctx, req.Name)

pkg/gce-pd-csi-driver/controller_test.go

-67
Original file line numberDiff line numberDiff line change
@@ -1413,70 +1413,3 @@ func TestPickRandAndConsecutive(t *testing.T) {
14131413

14141414
}
14151415
}
1416-
1417-
func TestVolumeOperationConcurrency(t *testing.T) {
1418-
readyToExecute := make(chan chan struct{}, 1)
1419-
gceDriver := initBlockingGCEDriver(t, nil, readyToExecute)
1420-
cs := gceDriver.cs
1421-
1422-
vol1CreateSnapshotAReq := &csi.CreateSnapshotRequest{
1423-
Name: name + "1A",
1424-
SourceVolumeId: testVolumeId + "1",
1425-
}
1426-
vol1CreateSnapshotBReq := &csi.CreateSnapshotRequest{
1427-
Name: name + "1B",
1428-
SourceVolumeId: testVolumeId + "1",
1429-
}
1430-
vol2CreateSnapshotReq := &csi.CreateSnapshotRequest{
1431-
Name: name + "2",
1432-
SourceVolumeId: testVolumeId + "2",
1433-
}
1434-
1435-
runRequest := func(req *csi.CreateSnapshotRequest) <-chan error {
1436-
response := make(chan error)
1437-
go func() {
1438-
_, err := cs.CreateSnapshot(context.Background(), req)
1439-
response <- err
1440-
}()
1441-
return response
1442-
}
1443-
1444-
// Start first valid request vol1CreateSnapshotA and block until it reaches the CreateSnapshot
1445-
vol1CreateSnapshotAResp := runRequest(vol1CreateSnapshotAReq)
1446-
execVol1CreateSnapshotA := <-readyToExecute
1447-
1448-
// Start vol1CreateSnapshotB and allow it to execute to completion. Then check for Aborted error.
1449-
// If a non Abort error is received or if the operation was started, then there is a problem
1450-
// with volume locking
1451-
vol1CreateSnapshotBResp := runRequest(vol1CreateSnapshotBReq)
1452-
select {
1453-
case err := <-vol1CreateSnapshotBResp:
1454-
if err != nil {
1455-
serverError, ok := status.FromError(err)
1456-
if !ok {
1457-
t.Fatalf("Could not get error status code from err: %v", err)
1458-
}
1459-
if serverError.Code() != codes.Aborted {
1460-
t.Errorf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
1461-
}
1462-
} else {
1463-
t.Errorf("Expected error: %v, got no error", codes.Aborted)
1464-
}
1465-
case <-readyToExecute:
1466-
t.Errorf("The operation for vol1CreateSnapshotB should have been aborted, but was started")
1467-
}
1468-
1469-
// Start vol2CreateSnapshot and allow it to execute to completion. Then check for success.
1470-
vol2CreateSnapshotResp := runRequest(vol2CreateSnapshotReq)
1471-
execVol2CreateSnapshot := <-readyToExecute
1472-
execVol2CreateSnapshot <- struct{}{}
1473-
if err := <-vol2CreateSnapshotResp; err != nil {
1474-
t.Errorf("Unexpected error: %v", err)
1475-
}
1476-
1477-
// To clean up, allow the vol1CreateSnapshotA to complete
1478-
execVol1CreateSnapshotA <- struct{}{}
1479-
if err := <-vol1CreateSnapshotAResp; err != nil {
1480-
t.Errorf("Unexpected error: %v", err)
1481-
}
1482-
}

pkg/gce-pd-csi-driver/gce-pd-driver.go

-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"google.golang.org/grpc/status"
2323
"k8s.io/klog"
2424
"k8s.io/kubernetes/pkg/util/mount"
25-
common "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
2625
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
2726
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
2827
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
@@ -139,7 +138,6 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
139138
Mounter: mounter,
140139
DeviceUtils: deviceUtils,
141140
MetadataService: meta,
142-
volumeLocks: common.NewVolumeLocks(),
143141
}
144142
}
145143

@@ -148,7 +146,6 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, met
148146
Driver: gceDriver,
149147
CloudProvider: cloudProvider,
150148
MetadataService: meta,
151-
volumeLocks: common.NewVolumeLocks(),
152149
}
153150
}
154151

pkg/gce-pd-csi-driver/gce-pd-driver_test.go

+4-20
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,13 @@ import (
2222
)
2323

2424
func initGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk) *GCEDriver {
25-
fakeCloudProvider, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks)
26-
if err != nil {
27-
t.Fatalf("Failed to create fake cloud provider: %v", err)
28-
}
29-
return initGCEDriverWithCloudProvider(t, fakeCloudProvider)
30-
}
31-
32-
func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, readyToExecute chan chan struct{}) *GCEDriver {
33-
fakeCloudProvider, err := gce.CreateFakeCloudProvider(project, zone, cloudDisks)
25+
vendorVersion := "test-vendor"
26+
gceDriver := GetGCEDriver()
27+
fakeCloudProvider, err := gce.FakeCreateCloudProvider(project, zone, cloudDisks)
3428
if err != nil {
3529
t.Fatalf("Failed to create fake cloud provider: %v", err)
3630
}
37-
fakeBlockingBlockProvider := &gce.FakeBlockingCloudProvider{
38-
FakeCloudProvider: fakeCloudProvider,
39-
ReadyToExecute: readyToExecute,
40-
}
41-
return initGCEDriverWithCloudProvider(t, fakeBlockingBlockProvider)
42-
}
43-
44-
func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {
45-
vendorVersion := "test-vendor"
46-
gceDriver := GetGCEDriver()
47-
err := gceDriver.SetupGCEDriver(cloudProvider, nil, nil, metadataservice.NewFakeService(), driver, vendorVersion)
31+
err = gceDriver.SetupGCEDriver(fakeCloudProvider, nil, nil, metadataservice.NewFakeService(), driver, vendorVersion)
4832
if err != nil {
4933
t.Fatalf("Failed to setup GCE Driver: %v", err)
5034
}

0 commit comments

Comments
 (0)