Skip to content

Commit c7d97eb

Browse files
committed
Make deviceInUseErrors atomic. Make deviceInUseTimeout a CLI flag
1 parent 10719ad commit c7d97eb

File tree

4 files changed

+80
-34
lines changed

4 files changed

+80
-34
lines changed

cmd/gce-pd-csi-driver/main.go

+2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ var (
6363
waitForOpBackoffJitter = flag.Float64("wait-op-backoff-jitter", 0.0, "Jitter for wait for operation backoff")
6464
waitForOpBackoffSteps = flag.Int("wait-op-backoff-steps", 100, "Steps for wait for operation backoff")
6565
waitForOpBackoffCap = flag.Duration("wait-op-backoff-cap", 0, "Cap for wait for operation backoff")
66+
deviceInUseTimeout = flag.Duration("device-in-use-timeout", 30, "Max time to wait for a device to be unsed when attempting to unstage")
6667

6768
maxProcs = flag.Int("maxprocs", 1, "GOMAXPROCS override")
6869
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
@@ -175,6 +176,7 @@ func handle() {
175176
defer cancel()
176177

177178
// Initialize driver
179+
driver.DeviceInUseTimeout = *deviceInUseTimeout
178180
gceDriver := driver.GetGCEDriver()
179181

180182
// Initialize identity server
+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
*/
14+
15+
package gceGCEDriver
16+
17+
import (
18+
"sync"
19+
"time"
20+
21+
"k8s.io/klog/v2"
22+
)
23+
24+
// deviceErrMap is an atomic data datastructure for recording deviceInUseError times
25+
// for specified devices
26+
type deviceErrMap struct {
27+
mux sync.Mutex
28+
deviceInUseErrors map[string]time.Time
29+
}
30+
31+
var DeviceInUseTimeout = time.Second * 30
32+
33+
// checkDeviceErrorTimeout returns true an error was encountered for the specified deviceName,
34+
// where the error happened at least `deviceInUseTimeout` seconds ago.
35+
func (devErrMap *deviceErrMap) checkDeviceErrorTimeout(deviceName string) bool {
36+
devErrMap.mux.Lock()
37+
defer devErrMap.mux.Unlock()
38+
39+
if devErrMap.deviceInUseErrors == nil {
40+
devErrMap.deviceInUseErrors = make(map[string]time.Time)
41+
}
42+
43+
lastErrTime, exists := devErrMap.deviceInUseErrors[deviceName]
44+
return exists && time.Now().Sub(lastErrTime).Seconds() >= deviceInUseTimeout.Seconds()
45+
}
46+
47+
// markDeviceError updates the internal `deviceInUseErrors` map to denote an error was encounted
48+
// for the specified deviceName at the current time. If an error had previously been recorded, the
49+
// time will not be updated.
50+
func (devErrMap *deviceErrMap) markDeviceError(deviceName string) {
51+
devErrMap.mux.Lock()
52+
defer devErrMap.mux.Unlock()
53+
54+
if devErrMap.deviceInUseErrors == nil {
55+
devErrMap.deviceInUseErrors = make(map[string]time.Time)
56+
}
57+
58+
// If an earlier error has already been recorded, do not overwrite it
59+
if _, exists := devErrMap.deviceInUseErrors[deviceName]; !exists {
60+
now := time.Now()
61+
klog.V(4).Infof("Recording in-use error for device %s at time %s", deviceName, now)
62+
devErrMap.deviceInUseErrors[deviceName] = now
63+
}
64+
}
65+
66+
// deleteDevice removes a specified device name from the map
67+
func (devErrMap *deviceErrMap) deleteDevice(deviceName string) {
68+
devErrMap.mux.Lock()
69+
defer devErrMap.mux.Unlock()
70+
delete(devErrMap.deviceInUseErrors, deviceName)
71+
}

pkg/gce-pd-csi-driver/node.go

+6-34
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type GCENodeServer struct {
5353

5454
// deviceInUseErrors keeps tracks of device names and a timestamp for when an error is
5555
// encounted for that device
56-
deviceInUseErrors map[string]time.Time
56+
deviceInUseErrors deviceErrMap
5757

5858
// If set, this semaphore will be used to serialize formatAndMount. It will be raised
5959
// when the operation starts, and lowered either when finished, or when
@@ -85,8 +85,6 @@ const (
8585
fsTypeExt3 = "ext3"
8686

8787
readAheadKBMountFlagRegexPattern = "^read_ahead_kb=(.+)$"
88-
89-
deviceInUseTimeout = 30
9088
)
9189

9290
var (
@@ -101,32 +99,6 @@ func getDefaultFsType() string {
10199
}
102100
}
103101

104-
// checkDeviceErrorTimeout returns true an error was encounted for the specified deviceName,
105-
// where the error happened at least `deviceInUseTimeout` seconds ago.
106-
func (ns *GCENodeServer) checkDeviceErrorTimeout(deviceName string) bool {
107-
if ns.deviceInUseErrors == nil {
108-
ns.deviceInUseErrors = make(map[string]time.Time)
109-
}
110-
111-
lastErrTime, exists := ns.deviceInUseErrors[deviceName]
112-
return exists && time.Now().Sub(lastErrTime).Seconds() >= deviceInUseTimeout
113-
}
114-
115-
// markDeviceError updates the internal `deviceInUseErrors` map to denote an error was encounted
116-
// for the specified deviceName at the current time
117-
func (ns *GCENodeServer) markDeviceError(deviceName string) {
118-
if ns.deviceInUseErrors == nil {
119-
ns.deviceInUseErrors = make(map[string]time.Time)
120-
}
121-
122-
// If an earlier error has already been recorded, do not overwrite it
123-
if _, exists := ns.deviceInUseErrors[deviceName]; !exists {
124-
now := time.Now()
125-
klog.V(4).Infof("Recording in-use error for device %s at time %s", deviceName, now)
126-
ns.deviceInUseErrors[deviceName] = now
127-
}
128-
}
129-
130102
func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
131103
notMnt, err := ns.Mounter.Interface.IsLikelyNotMountPoint(path)
132104
klog.V(4).Infof("Checking volume path %s is mounted %t: error %v", path, !notMnt, err)
@@ -489,18 +461,18 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
489461
}
490462

491463
if err := ns.confirmDeviceUnused(volumeID); err != nil {
492-
var targetErr *ignoreableError
493-
if errors.As(err, &targetErr) {
464+
var ignoreableErr *ignoreableError
465+
if errors.As(err, &ignoreableErr) {
494466
klog.Warningf("Unabled to check if device for %s is unused. Device has been unmounted successfully. Ignoring and continuing with unstaging. (%v)", volumeID, err)
495-
} else if ns.checkDeviceErrorTimeout(volumeID) {
467+
} else if ns.deviceInUseErrors.checkDeviceErrorTimeout(volumeID) {
496468
klog.Warningf("Device %s could not be released after timeout of %d seconds. NodeUnstageVolume will return success.", volumeID, deviceInUseTimeout)
497469
} else {
498-
ns.markDeviceError(volumeID)
470+
ns.deviceInUseErrors.markDeviceError(volumeID)
499471
return nil, status.Errorf(codes.Internal, "NodeUnstageVolume for volume %s failed: %v", volumeID, err)
500472
}
501473
}
502474

503-
delete(ns.deviceInUseErrors, volumeID)
475+
ns.deviceInUseErrors.deleteDevice(volumeID)
504476
klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
505477
return &csi.NodeUnstageVolumeResponse{}, nil
506478
}

test/e2e/utils/utils.go

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, driverConfig DriverC
6868
"--use-instance-api-to-list-volumes-published-nodes",
6969
"--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme",
7070
"--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml",
71+
"--device-in-use-timeout=10", // Set lower than the usual value to expidite tests
7172
fmt.Sprintf("--fallback-requisite-zones=%s", strings.Join(driverConfig.Zones, ",")),
7273
}
7374
extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint=%s", driverConfig.ComputeEndpoint))

0 commit comments

Comments
 (0)