Skip to content

Commit 8f91f4e

Browse files
author
Hantao (Will) Wang
committed
use the volume lock for controller parallelization
1 parent 28d3405 commit 8f91f4e

File tree

3 files changed

+66
-16
lines changed

3 files changed

+66
-16
lines changed

pkg/common/volume_lock.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package common
18+
19+
import "sync"
20+
21+
type VolumeLocks struct {
22+
locks map[string]interface{}
23+
mux sync.Mutex
24+
}
25+
26+
func NewVolumeLocks() *VolumeLocks {
27+
return &VolumeLocks{
28+
locks: make(map[string]interface{}),
29+
}
30+
}
31+
32+
func (vl *VolumeLocks) TryAcquire(lock string) bool {
33+
vl.mux.Lock()
34+
defer vl.mux.Unlock()
35+
if _, exists := vl.locks[lock]; exists {
36+
return false
37+
}
38+
vl.locks[lock] = nil
39+
return true
40+
}
41+
42+
func (vl *VolumeLocks) Release(lock string) {
43+
vl.mux.Lock()
44+
defer vl.mux.Unlock()
45+
delete(vl.locks, lock)
46+
}
47+

pkg/gce-pd-csi-driver/controller.go

+17-16
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"math/rand"
2020
"sort"
2121
"strings"
22-
"sync"
2322
"time"
2423

2524
"github.com/golang/protobuf/ptypes"
@@ -46,7 +45,7 @@ type GCEControllerServer struct {
4645

4746
// A map storing all volumes with ongoing operations so that additional operations
4847
// for that same volume (as defined by Volume Key) return an Aborted error
49-
volumes sync.Map
48+
volumes *common.VolumeLocks
5049
}
5150

5251
var _ csi.ControllerServer = &GCEControllerServer{}
@@ -144,10 +143,10 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
144143
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume replication type '%s' is not supported", replicationType))
145144
}
146145

147-
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
146+
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
148147
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
149148
}
150-
defer gceCS.volumes.Delete(volKey.String())
149+
defer gceCS.volumes.Release(volKey.String())
151150

152151
// Validate if disk already exists
153152
existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, volKey)
@@ -232,10 +231,10 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
232231
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
233232
}
234233

235-
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
234+
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
236235
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
237236
}
238-
defer gceCS.volumes.Delete(volKey.String())
237+
defer gceCS.volumes.Release(volKey.String())
239238

240239
err = gceCS.CloudProvider.DeleteDisk(ctx, volKey)
241240
if err != nil {
@@ -273,10 +272,11 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
273272
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
274273
}
275274

276-
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
277-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
275+
volKeyStr := volKey.String()
276+
if acquired := gceCS.volumes.TryAcquire(nodeID + "/" + volKeyStr); !acquired {
277+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s or Node ID %s already exists", volKeyStr, nodeID))
278278
}
279-
defer gceCS.volumes.Delete(volKey.String())
279+
defer gceCS.volumes.Release(nodeID + "/" + volKeyStr)
280280

281281
// TODO(#253): Check volume capability matches for ALREADY_EXISTS
282282
if err = validateVolumeCapability(volumeCapability); err != nil {
@@ -363,10 +363,11 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
363363
return nil, err
364364
}
365365

366-
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
367-
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
366+
volKeyStr := volKey.String()
367+
if acquired := gceCS.volumes.TryAcquire(nodeID + "/" + volKeyStr); !acquired {
368+
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s or Node ID %s already exists", volKeyStr, nodeID))
368369
}
369-
defer gceCS.volumes.Delete(volKey.String())
370+
defer gceCS.volumes.Release(nodeID + "/" + volKeyStr)
370371

371372
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
372373
if err != nil {
@@ -415,10 +416,10 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
415416
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume ID is of improper format, got %v", volumeID))
416417
}
417418

418-
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
419+
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
419420
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
420421
}
421-
defer gceCS.volumes.Delete(volKey.String())
422+
defer gceCS.volumes.Release(volKey.String())
422423

423424
_, err = gceCS.CloudProvider.GetDisk(ctx, volKey)
424425
if err != nil {
@@ -527,10 +528,10 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C
527528
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not find volume with ID %v: %v", volumeID, err))
528529
}
529530

530-
if _, alreadyExists := gceCS.volumes.LoadOrStore(volKey.String(), true); alreadyExists {
531+
if acquired := gceCS.volumes.TryAcquire(volKey.String()); !acquired {
531532
return nil, status.Error(codes.Aborted, fmt.Sprintf("An operation with the given Volume Key %s already exists", volKey.String()))
532533
}
533-
defer gceCS.volumes.Delete(volKey.String())
534+
defer gceCS.volumes.Release(volKey.String())
534535

535536
// Check if snapshot already exists
536537
var snapshot *compute.Snapshot

pkg/gce-pd-csi-driver/gce-pd-driver.go

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"google.golang.org/grpc/status"
2323
"k8s.io/klog"
2424
"k8s.io/kubernetes/pkg/util/mount"
25+
common "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
2526
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
2627
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
2728
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
@@ -144,6 +145,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, met
144145
Driver: gceDriver,
145146
CloudProvider: cloudProvider,
146147
MetadataService: meta,
148+
volumes: common.NewVolumeLocks(),
147149
}
148150
}
149151

0 commit comments

Comments
 (0)