Skip to content

Commit 834b272

Browse files
committed
Use LRU for deviceInUse map. Disable device in use checks entirely if specified to do so
1 parent 3312cef commit 834b272

File tree

20 files changed

+1507
-46
lines changed

20 files changed

+1507
-46
lines changed

cmd/gce-pd-csi-driver/main.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ var (
6464
waitForOpBackoffSteps = flag.Int("wait-op-backoff-steps", 100, "Steps for wait for operation backoff")
6565
waitForOpBackoffCap = flag.Duration("wait-op-backoff-cap", 0, "Cap for wait for operation backoff")
6666

67-
enableDeviceInUseTimeout = flag.Bool("enable-device-in-use-timeout", true, "If set to true, ignores device in use errors when attempting to unstage a device if it has been stuck for longer than 'device-in-use-timeout'")
68-
deviceInUseTimeout = flag.Duration("device-in-use-timeout", 30*time.Second, "Max time to wait for a device to be unused when attempting to unstage")
67+
enableDeviceInUseCheck = flag.Bool("enable-device-in-use-check-on-node-unstage", true, "If set to true, block NodeUnstageVolume requests until the specified device is not in use")
68+
deviceInUseTimeout = flag.Duration("device-in-use-timeout", 30*time.Second, "Max time to wait for a device to be unused when attempting to unstage. Exceeding the timeout will cause an unstage request to return success and ignore the device in use check.")
6969

7070
maxProcs = flag.Int("maxprocs", 1, "GOMAXPROCS override")
7171
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
@@ -244,8 +244,8 @@ func handle() {
244244
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
245245
}
246246
nsArgs := driver.NodeServerArgs{
247-
EnableDeviceInUseTimeout: *enableDeviceInUseTimeout,
248-
DeviceInUseTimeout: *deviceInUseTimeout,
247+
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
248+
DeviceInUseTimeout: *deviceInUseTimeout,
249249
}
250250
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
251251
if *maxConcurrentFormatAndMount > 0 {

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ require (
7676
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
7777
github.com/hashicorp/errwrap v1.0.0 // indirect
7878
github.com/hashicorp/go-multierror v1.1.0 // indirect
79+
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
7980
github.com/imdario/mergo v0.3.12 // indirect
8081
github.com/josharian/intern v1.0.0 // indirect
8182
github.com/json-iterator/go v1.1.12 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -1550,6 +1550,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
15501550
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
15511551
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
15521552
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
1553+
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
1554+
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
15531555
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
15541556
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
15551557
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=

pkg/gce-pd-csi-driver/device_error_map.go

+26-27
Original file line numberDiff line numberDiff line change
@@ -15,69 +15,68 @@ limitations under the License.
1515
package gceGCEDriver
1616

1717
import (
18+
"fmt"
1819
"sync"
1920
"time"
2021

22+
lru "github.com/hashicorp/golang-lru/v2"
2123
"k8s.io/klog/v2"
2224
)
2325

26+
// maxDeviceCacheSize specifies the maximum number if in-use devices to cache.
27+
const maxDeviceCacheSize = 256
28+
29+
// currentTime is used to stub time.Now in unit tests
30+
var currentTime = time.Now
31+
2432
// deviceErrMap is an atomic data datastructure for recording deviceInUseError times
2533
// for specified devices
2634
type deviceErrMap struct {
27-
enabled bool
28-
timeout time.Duration
29-
mux sync.Mutex
30-
deviceInUseErrors map[string]time.Time
35+
timeout time.Duration
36+
mux sync.Mutex
37+
cache *lru.Cache[string, time.Time]
3138
}
3239

33-
func newDeviceErrMap(shouldEnable bool, timeout time.Duration) *deviceErrMap {
40+
func newDeviceErrMap(timeout time.Duration) *deviceErrMap {
41+
c, err := lru.New[string, time.Time](maxDeviceCacheSize)
42+
if err != nil {
43+
panic(fmt.Sprintf("Could not initialize deviceInUse LRU cache: %s", err))
44+
}
45+
3446
return &deviceErrMap{
35-
deviceInUseErrors: make(map[string]time.Time),
36-
enabled: shouldEnable,
37-
timeout: timeout,
47+
cache: c,
48+
timeout: timeout,
3849
}
3950
}
4051

4152
// checkDeviceErrorTimeout returns true an error was encountered for the specified deviceName,
4253
// where the error happened at least `deviceInUseTimeout` seconds ago.
4354
func (devErrMap *deviceErrMap) checkDeviceErrorTimeout(deviceName string) bool {
44-
if !devErrMap.enabled {
45-
return false
46-
}
47-
4855
devErrMap.mux.Lock()
4956
defer devErrMap.mux.Unlock()
5057

51-
lastErrTime, exists := devErrMap.deviceInUseErrors[deviceName]
52-
return exists && time.Now().Sub(lastErrTime).Seconds() >= devErrMap.timeout.Seconds()
58+
lastErrTime, exists := devErrMap.cache.Get(deviceName)
59+
return exists && currentTime().Sub(lastErrTime).Seconds() >= devErrMap.timeout.Seconds()
5360
}
5461

55-
// markDeviceError updates the internal `deviceInUseErrors` map to denote an error was encounted
62+
// markDeviceError updates the internal `cache` map to denote an error was encounted
5663
// for the specified deviceName at the current time. If an error had previously been recorded, the
5764
// time will not be updated.
5865
func (devErrMap *deviceErrMap) markDeviceError(deviceName string) {
59-
if !devErrMap.enabled {
60-
return
61-
}
62-
6366
devErrMap.mux.Lock()
6467
defer devErrMap.mux.Unlock()
6568

6669
// If an earlier error has already been recorded, do not overwrite it
67-
if _, exists := devErrMap.deviceInUseErrors[deviceName]; !exists {
68-
now := time.Now()
70+
if _, exists := devErrMap.cache.Get(deviceName); !exists {
71+
now := currentTime()
6972
klog.V(4).Infof("Recording in-use error for device %s at time %s", deviceName, now)
70-
devErrMap.deviceInUseErrors[deviceName] = now
73+
devErrMap.cache.Add(deviceName, now)
7174
}
7275
}
7376

7477
// deleteDevice removes a specified device name from the map
7578
func (devErrMap *deviceErrMap) deleteDevice(deviceName string) {
76-
if !devErrMap.enabled {
77-
return
78-
}
79-
8079
devErrMap.mux.Lock()
8180
defer devErrMap.mux.Unlock()
82-
delete(devErrMap.deviceInUseErrors, deviceName)
81+
devErrMap.cache.Remove(deviceName)
8382
}

pkg/gce-pd-csi-driver/gce-pd-driver.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,14 @@ func NewIdentityServer(gceDriver *GCEDriver) *GCEIdentityServer {
147147

148148
func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter, args NodeServerArgs) *GCENodeServer {
149149
return &GCENodeServer{
150-
Driver: gceDriver,
151-
Mounter: mounter,
152-
DeviceUtils: deviceUtils,
153-
MetadataService: meta,
154-
volumeLocks: common.NewVolumeLocks(),
155-
VolumeStatter: statter,
156-
deviceInUseErrors: newDeviceErrMap(args.EnableDeviceInUseTimeout, args.DeviceInUseTimeout),
150+
Driver: gceDriver,
151+
Mounter: mounter,
152+
DeviceUtils: deviceUtils,
153+
MetadataService: meta,
154+
volumeLocks: common.NewVolumeLocks(),
155+
VolumeStatter: statter,
156+
enableDeviceInUseCheck: args.EnableDeviceInUseCheck,
157+
deviceInUseErrors: newDeviceErrMap(args.DeviceInUseTimeout),
157158
}
158159
}
159160

pkg/gce-pd-csi-driver/node.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ type GCENodeServer struct {
5151
// for that same volume (as defined by VolumeID) return an Aborted error
5252
volumeLocks *common.VolumeLocks
5353

54+
// enableDeviceInUseCheck, if true, will block NodeUnstageVolume requests if the specified
55+
// device is still in use (or until --device-in-use-timeout is reached, if specified)
56+
enableDeviceInUseCheck bool
5457
// deviceInUseErrors keeps tracks of device names and a timestamp for when an error is
5558
// encounted for that device
5659
deviceInUseErrors *deviceErrMap
@@ -70,10 +73,9 @@ type GCENodeServer struct {
7073
}
7174

7275
type NodeServerArgs struct {
73-
// EnableDeviceInUseTimeout enables functionality which will cause unstage requests
74-
// that are stucking waiting for a device to be unused to return succesfully
75-
// after `DeviceInUseTimeout` has elapsed
76-
EnableDeviceInUseTimeout bool
76+
// EnableDeviceInUseCheck enables functionality which will block NodeUnstageVolume request
77+
// until the device is not in use
78+
EnableDeviceInUseCheck bool
7779

7880
DeviceInUseTimeout time.Duration
7981
}
@@ -490,6 +492,10 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
490492
type ignoreableError struct{ error }
491493

492494
func (ns *GCENodeServer) confirmDeviceUnused(volumeID string) error {
495+
if !ns.enableDeviceInUseCheck {
496+
return nil
497+
}
498+
493499
devicePath, err := getDevicePath(ns, volumeID, "" /* partition, which is unused */)
494500
if err != nil {
495501
return &ignoreableError{fmt.Errorf("failed to find device path for volume %s: %v", volumeID, err.Error())}

pkg/gce-pd-csi-driver/node_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAn
5151

5252
func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService) *GCEDriver {
5353
gceDriver := GetGCEDriver()
54-
nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), NodeServerArgs{false, 0})
54+
nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0})
5555
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
5656
if err != nil {
5757
t.Fatalf("Failed to setup GCE Driver: %v", err)
@@ -62,7 +62,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
6262
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
6363
gceDriver := GetGCEDriver()
6464
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
65-
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{false, 0})
65+
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0})
6666
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
6767
if err != nil {
6868
t.Fatalf("Failed to setup GCE Driver: %v", err)
@@ -73,7 +73,7 @@ func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct
7373
func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
7474
gceDriver := GetGCEDriver()
7575
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
76-
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{false, 0}).WithSerializedFormatAndMount(5*time.Second, 1)
76+
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0}).WithSerializedFormatAndMount(5*time.Second, 1)
7777

7878
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
7979
if err != nil {

test/sanity/sanity_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestSanity(t *testing.T) {
7777
identityServer := driver.NewIdentityServer(gceDriver)
7878
controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig)
7979
fakeStatter := mountmanager.NewFakeStatterWithOptions(mounter, mountmanager.FakeStatterOptions{IsBlock: false})
80-
nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, driver.NodeServerArgs{EnableDeviceInUseTimeout: false, DeviceInUseTimeout: 0})
80+
nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, driver.NodeServerArgs{EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0})
8181
err = gceDriver.SetupGCEDriver(driverName, vendorVersion, extraLabels, nil, identityServer, controllerServer, nodeServer)
8282
if err != nil {
8383
t.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())

vendor/github.com/hashicorp/golang-lru/v2/.gitignore

+23
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/hashicorp/golang-lru/v2/.golangci.yml

+46
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)