Skip to content

Commit fa19cb0

Browse files
committed
Add flag to enable deviceInUseTimeout. Pass args through NodeServer constructor
1 parent bd634b8 commit fa19cb0

File tree

7 files changed

+54
-27
lines changed

7 files changed

+54
-27
lines changed

cmd/gce-pd-csi-driver/main.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ var (
6363
waitForOpBackoffJitter = flag.Float64("wait-op-backoff-jitter", 0.0, "Jitter for wait for operation backoff")
6464
waitForOpBackoffSteps = flag.Int("wait-op-backoff-steps", 100, "Steps for wait for operation backoff")
6565
waitForOpBackoffCap = flag.Duration("wait-op-backoff-cap", 0, "Cap for wait for operation backoff")
66-
deviceInUseTimeout = flag.Duration("device-in-use-timeout", 30*time.Second, "Max time to wait for a device to be unsed when attempting to unstage")
66+
67+
enableDeviceInUseTimeout = flag.Bool("enable-device-in-use-timeout", true, "If set to true, ignores device in use errors when attempting to unstage a device if it has been stuck for longer than 'device-in-use-timeout'")
68+
deviceInUseTimeout = flag.Duration("device-in-use-timeout", 30*time.Second, "Max time to wait for a device to be unused when attempting to unstage")
6769

6870
maxProcs = flag.Int("maxprocs", 1, "GOMAXPROCS override")
6971
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
@@ -177,7 +179,6 @@ func handle() {
177179
defer cancel()
178180

179181
// Initialize driver
180-
driver.DeviceInUseTimeout = *deviceInUseTimeout
181182
gceDriver := driver.GetGCEDriver()
182183

183184
// Initialize identity server
@@ -243,7 +244,11 @@ func handle() {
243244
if err != nil {
244245
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
245246
}
246-
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter)
247+
nsArgs := driver.NodeServerArgs{
248+
EnableDeviceInUseTimeout: *enableDeviceInUseTimeout,
249+
DeviceInUseTimeout: *deviceInUseTimeout,
250+
}
251+
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
247252
if *maxConcurrentFormatAndMount > 0 {
248253
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
249254
}

pkg/gce-pd-csi-driver/device_error_map.go

+22-10
Original file line numberDiff line numberDiff line change
@@ -24,37 +24,45 @@ import (
2424
// deviceErrMap is an atomic data datastructure for recording deviceInUseError times
2525
// for specified devices
2626
type deviceErrMap struct {
27+
enabled bool
28+
timeout time.Duration
2729
mux sync.Mutex
2830
deviceInUseErrors map[string]time.Time
2931
}
3032

31-
var DeviceInUseTimeout = time.Second * 30
33+
func newDeviceErrMap(shouldEnable bool, timeout time.Duration) *deviceErrMap {
34+
return &deviceErrMap{
35+
deviceInUseErrors: make(map[string]time.Time),
36+
enabled: shouldEnable,
37+
timeout: timeout,
38+
}
39+
}
3240

3341
// checkDeviceErrorTimeout returns true an error was encountered for the specified deviceName,
3442
// where the error happened at least `deviceInUseTimeout` seconds ago.
3543
func (devErrMap *deviceErrMap) checkDeviceErrorTimeout(deviceName string) bool {
44+
if !devErrMap.enabled {
45+
return false
46+
}
47+
3648
devErrMap.mux.Lock()
3749
defer devErrMap.mux.Unlock()
3850

39-
if devErrMap.deviceInUseErrors == nil {
40-
devErrMap.deviceInUseErrors = make(map[string]time.Time)
41-
}
42-
4351
lastErrTime, exists := devErrMap.deviceInUseErrors[deviceName]
44-
return exists && time.Now().Sub(lastErrTime).Seconds() >= DeviceInUseTimeout.Seconds()
52+
return exists && time.Now().Sub(lastErrTime).Seconds() >= devErrMap.timeout.Seconds()
4553
}
4654

4755
// markDeviceError updates the internal `deviceInUseErrors` map to denote an error was encounted
4856
// for the specified deviceName at the current time. If an error had previously been recorded, the
4957
// time will not be updated.
5058
func (devErrMap *deviceErrMap) markDeviceError(deviceName string) {
59+
if !devErrMap.enabled {
60+
return
61+
}
62+
5163
devErrMap.mux.Lock()
5264
defer devErrMap.mux.Unlock()
5365

54-
if devErrMap.deviceInUseErrors == nil {
55-
devErrMap.deviceInUseErrors = make(map[string]time.Time)
56-
}
57-
5866
// If an earlier error has already been recorded, do not overwrite it
5967
if _, exists := devErrMap.deviceInUseErrors[deviceName]; !exists {
6068
now := time.Now()
@@ -65,6 +73,10 @@ func (devErrMap *deviceErrMap) markDeviceError(deviceName string) {
6573

6674
// deleteDevice removes a specified device name from the map
6775
func (devErrMap *deviceErrMap) deleteDevice(deviceName string) {
76+
if !devErrMap.enabled {
77+
return
78+
}
79+
6880
devErrMap.mux.Lock()
6981
defer devErrMap.mux.Unlock()
7082
delete(devErrMap.deviceInUseErrors, deviceName)

pkg/gce-pd-csi-driver/gce-pd-driver.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,15 @@ func NewIdentityServer(gceDriver *GCEDriver) *GCEIdentityServer {
145145
}
146146
}
147147

148-
func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter) *GCENodeServer {
148+
func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter, args NodeServerArgs) *GCENodeServer {
149149
return &GCENodeServer{
150-
Driver: gceDriver,
151-
Mounter: mounter,
152-
DeviceUtils: deviceUtils,
153-
MetadataService: meta,
154-
volumeLocks: common.NewVolumeLocks(),
155-
VolumeStatter: statter,
150+
Driver: gceDriver,
151+
Mounter: mounter,
152+
DeviceUtils: deviceUtils,
153+
MetadataService: meta,
154+
volumeLocks: common.NewVolumeLocks(),
155+
VolumeStatter: statter,
156+
deviceInUseErrors: newDeviceErrMap(args.EnableDeviceInUseTimeout, args.DeviceInUseTimeout),
156157
}
157158
}
158159

pkg/gce-pd-csi-driver/node.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type GCENodeServer struct {
5353

5454
// deviceInUseErrors keeps tracks of device names and a timestamp for when an error is
5555
// encounted for that device
56-
deviceInUseErrors deviceErrMap
56+
deviceInUseErrors *deviceErrMap
5757

5858
// If set, this semaphore will be used to serialize formatAndMount. It will be raised
5959
// when the operation starts, and lowered either when finished, or when
@@ -69,6 +69,15 @@ type GCENodeServer struct {
6969
csi.UnimplementedNodeServer
7070
}
7171

72+
type NodeServerArgs struct {
73+
// EnableDeviceInUseTimeout enables functionality which will cause unstage requests
74+
// that are stucking waiting for a device to be unused to return succesfully
75+
// after `DeviceInUseTimeout` has elapsed
76+
EnableDeviceInUseTimeout bool
77+
78+
DeviceInUseTimeout time.Duration
79+
}
80+
7281
var _ csi.NodeServer = &GCENodeServer{}
7382

7483
// The constants are used to map from the machine type to the limit of
@@ -465,7 +474,7 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
465474
if errors.As(err, &ignoreableErr) {
466475
klog.Warningf("Unabled to check if device for %s is unused. Device has been unmounted successfully. Ignoring and continuing with unstaging. (%v)", volumeID, err)
467476
} else if ns.deviceInUseErrors.checkDeviceErrorTimeout(volumeID) {
468-
klog.Warningf("Device %s could not be released after timeout of %d seconds. NodeUnstageVolume will return success.", volumeID, DeviceInUseTimeout)
477+
klog.Warningf("Device %s could not be released after timeout of %f seconds. NodeUnstageVolume will return success.", volumeID, ns.deviceInUseErrors.timeout.Seconds())
469478
} else {
470479
ns.deviceInUseErrors.markDeviceError(volumeID)
471480
return nil, status.Errorf(codes.Internal, "NodeUnstageVolume for volume %s failed: %v", volumeID, err)

pkg/gce-pd-csi-driver/node_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAn
5151

5252
func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService) *GCEDriver {
5353
gceDriver := GetGCEDriver()
54-
nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter))
54+
nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), NodeServerArgs{false, 0})
5555
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
5656
if err != nil {
5757
t.Fatalf("Failed to setup GCE Driver: %v", err)
@@ -62,7 +62,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
6262
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
6363
gceDriver := GetGCEDriver()
6464
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
65-
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter))
65+
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{false, 0})
6666
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
6767
if err != nil {
6868
t.Fatalf("Failed to setup GCE Driver: %v", err)
@@ -73,7 +73,7 @@ func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct
7373
func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
7474
gceDriver := GetGCEDriver()
7575
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
76-
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5*time.Second, 1)
76+
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{false, 0}).WithSerializedFormatAndMount(5*time.Second, 1)
7777

7878
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
7979
if err != nil {

test/e2e/utils/utils.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, driverConfig DriverC
6969
"--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme",
7070
"--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml",
7171
"--allow-hdha-provisioning",
72-
"--device-in-use-timeout=10s", // Set lower than the usual value to expidite tests
72+
"--device-in-use-timeout=10s", // Set lower than the usual value to expedite tests
7373
fmt.Sprintf("--fallback-requisite-zones=%s", strings.Join(driverConfig.Zones, ",")),
7474
}
7575
extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint=%s", driverConfig.ComputeEndpoint))

test/sanity/sanity_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestSanity(t *testing.T) {
7777
identityServer := driver.NewIdentityServer(gceDriver)
7878
controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true)
7979
fakeStatter := mountmanager.NewFakeStatterWithOptions(mounter, mountmanager.FakeStatterOptions{IsBlock: false})
80-
nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter)
80+
nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, driver.NodeServerArgs{EnableDeviceInUseTimeout: false, DeviceInUseTimeout: 0})
8181
err = gceDriver.SetupGCEDriver(driverName, vendorVersion, extraLabels, nil, identityServer, controllerServer, nodeServer)
8282
if err != nil {
8383
t.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())

0 commit comments

Comments
 (0)