Skip to content

Commit c8759d2

Browse files
author
Hantao (Will) Wang
committed
responding to comments
1 parent 1d668ad commit c8759d2

File tree

5 files changed

+15
-19
lines changed

5 files changed

+15
-19
lines changed

pkg/common/volume_lock.go

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ func NewVolumeLocks() *VolumeLocks {
3939
}
4040
}
4141

42+
// Tries to acquire the lock for operating on volumeID and returns true if successful.
43+
// If another operation is already using volumeID, returns false.
4244
func (vl *VolumeLocks) TryAcquire(volumeID string) bool {
4345
vl.mux.Lock()
4446
defer vl.mux.Unlock()

pkg/gce-cloud-provider/compute/fake-gce.go

-4
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,6 @@ func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa
6363
return fcp, nil
6464
}
6565

66-
func (cloud *FakeCloudProvider) GetProject() string {
67-
return cloud.project
68-
}
69-
7066
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, volumeKey *meta.Key) (*meta.Key, error) {
7167
switch volumeKey.Type() {
7268
case meta.Zonal:

pkg/gce-cloud-provider/compute/gce-compute.go

-6
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,6 @@ type GCECompute interface {
6060
GetSnapshot(ctx context.Context, snapshotName string) (*compute.Snapshot, error)
6161
CreateSnapshot(ctx context.Context, volKey *meta.Key, snapshotName string) (*compute.Snapshot, error)
6262
DeleteSnapshot(ctx context.Context, snapshotName string) error
63-
// Metadata
64-
GetProject() string
65-
}
66-
67-
func (cloud *CloudProvider) GetProject() string {
68-
return cloud.project
6963
}
7064

7165
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified

pkg/gce-pd-csi-driver/controller.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
143143
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume replication type '%s' is not supported", replicationType))
144144
}
145145

146-
volumeID, err := common.KeyToVolumeID(volKey, gceCS.CloudProvider.GetProject())
146+
volumeID, err := common.KeyToVolumeID(volKey, gceCS.MetadataService.GetProject())
147147
if err != nil {
148148
return nil, status.Errorf(codes.Internal, "Failed to convert volume key to volume ID: %v", err)
149149
}
@@ -276,6 +276,8 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
276276
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
277277
}
278278

279+
// Acquires the lock for the volume on that node only, because we need to support the ability
280+
// to publish the same volume onto different nodes concurrently
279281
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
280282
if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired {
281283
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)
@@ -367,6 +369,8 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
367369
return nil, err
368370
}
369371

372+
// Acquires the lock for the volume on that node only, because we need to support the ability
373+
// to unpublish the same volume from different nodes concurrently
370374
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
371375
if acquired := gceCS.volumeLocks.TryAcquire(lockingVolumeID); !acquired {
372376
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, lockingVolumeID)

pkg/gce-pd-csi-driver/controller_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -1432,7 +1432,7 @@ func TestVolumeOperationConcurrency(t *testing.T) {
14321432
SourceVolumeId: testVolumeId + "2",
14331433
}
14341434

1435-
runRequest := func(req *csi.CreateSnapshotRequest) chan error {
1435+
runRequest := func(req *csi.CreateSnapshotRequest) <-chan error {
14361436
response := make(chan error)
14371437
go func() {
14381438
_, err := cs.CreateSnapshot(context.Background(), req)
@@ -1454,29 +1454,29 @@ func TestVolumeOperationConcurrency(t *testing.T) {
14541454
if err != nil {
14551455
serverError, ok := status.FromError(err)
14561456
if !ok {
1457-
t.Fatalf("Could not get error status code from err: %v", err)
1457+
t.Errorf("Could not get error status code from err: %v", err)
14581458
}
14591459
if serverError.Code() != codes.Aborted {
1460-
t.Fatalf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
1460+
t.Errorf("Expected error code: %v, got: %v. err : %v", codes.Aborted, serverError.Code(), err)
14611461
}
14621462
} else {
1463-
t.Fatalf("Expected error: %v, got no error", codes.Aborted)
1463+
t.Errorf("Expected error: %v, got no error", codes.Aborted)
14641464
}
14651465
case <-readyToExecute:
1466-
t.Fatal("The operation for vol1PublishTargetB should have been aborted, but was started")
1466+
t.Errorf("The operation for vol1PublishTargetB should have been aborted, but was started")
14671467
}
14681468

14691469
// Start vol2CreateSnapshot and allow it to execute to completion. Then check for success.
14701470
vol2CreateSnapshotResp := runRequest(vol2CreateSnapshotReq)
14711471
execVol2CreateSnapshot := <-readyToExecute
14721472
execVol2CreateSnapshot <- struct{}{}
14731473
if err := <-vol2CreateSnapshotResp; err != nil {
1474-
t.Fatalf("Unexpected error: %v", err)
1474+
t.Errorf("Unexpected error: %v", err)
14751475
}
14761476

1477-
// To clean up, allow the vol1PublishTargetA to complete
1477+
// To clean up, allow the vol1CreateSnapshotA to complete
14781478
execVol1CreateSnapshotA <- struct{}{}
14791479
if err := <-vol1CreateSnapshotAResp; err != nil {
1480-
t.Fatalf("Unexpected error: %v", err)
1480+
t.Errorf("Unexpected error: %v", err)
14811481
}
14821482
}

0 commit comments

Comments
 (0)