Skip to content

Force timeout nodeunstagevolume #1918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var (
waitForOpBackoffSteps = flag.Int("wait-op-backoff-steps", 100, "Steps for wait for operation backoff")
waitForOpBackoffCap = flag.Duration("wait-op-backoff-cap", 0, "Cap for wait for operation backoff")

enableDeviceInUseCheck = flag.Bool("enable-device-in-use-check-on-node-unstage", true, "If set to true, block NodeUnstageVolume requests until the specified device is not in use")
deviceInUseTimeout = flag.Duration("device-in-use-timeout", 30*time.Second, "Max time to wait for a device to be unused when attempting to unstage. Exceeding the timeout will cause an unstage request to return success and ignore the device in use check.")

maxProcs = flag.Int("maxprocs", 1, "GOMAXPROCS override")
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released")
Expand Down Expand Up @@ -241,7 +244,11 @@ func handle() {
if err != nil {
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter)
nsArgs := driver.NodeServerArgs{
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
DeviceInUseTimeout: *deviceInUseTimeout,
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
if *maxConcurrentFormatAndMount > 0 {
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.12.4
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/kubernetes-csi/csi-proxy/client v1.1.3
github.com/kubernetes-csi/csi-test/v5 v5.3.0
github.com/onsi/ginkgo/v2 v2.20.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
Expand Down
86 changes: 86 additions & 0 deletions pkg/gce-pd-csi-driver/device_error_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gceGCEDriver

import (
"sync"
"time"

"github.com/hashicorp/golang-lru/v2/expirable"
"k8s.io/klog/v2"
)

// maxDeviceCacheSize specifies the maximum number if in-use devices to cache.
// 256 was selected since it is twice the number of max PDs per VM (128)
const maxDeviceCacheSize = 256

// currentTime is used to stub time.Now in unit tests
var currentTime = time.Now

// deviceErrMap is an atomic data datastructure for recording deviceInUseError times
// for specified devices
type deviceErrMap struct {
timeout time.Duration
mux sync.Mutex
cache *expirable.LRU[string, time.Time]
}

func newDeviceErrMap(timeout time.Duration) *deviceErrMap {
c := expirable.NewLRU[string, time.Time](maxDeviceCacheSize, nil, timeout*2)

return &deviceErrMap{
cache: c,
timeout: timeout,
}
}

// deviceErrorExpired returns true if an error for the specified device is expired,
// where the expiration is specified by `--device-in-use-timeout`
func (devErrMap *deviceErrMap) deviceErrorExpired(deviceName string) bool {
devErrMap.mux.Lock()
defer devErrMap.mux.Unlock()

firstEncounteredErrTime, exists := devErrMap.cache.Get(deviceName)
if !exists {
// If the deviceName does not exist in the map, then this is the first time
// an error was encountered for that device. We return false since it cannot be
// expired yet.
return false
}
expirationTime := firstEncounteredErrTime.Add(devErrMap.timeout)
return currentTime().After(expirationTime)
}

// markDeviceError updates the internal `cache` map to denote an error was encounted
// for the specified deviceName at the current time. If an error had previously been recorded, the
// time will not be updated.
func (devErrMap *deviceErrMap) markDeviceError(deviceName string) {
devErrMap.mux.Lock()
defer devErrMap.mux.Unlock()

// If an earlier error has already been recorded, do not overwrite it
if _, exists := devErrMap.cache.Get(deviceName); !exists {
now := currentTime()
klog.V(4).Infof("Recording in-use error for device %s at time %s", deviceName, now)
devErrMap.cache.Add(deviceName, now)
}
}

// deleteDevice removes a specified device name from the map
func (devErrMap *deviceErrMap) deleteDevice(deviceName string) {
devErrMap.mux.Lock()
defer devErrMap.mux.Unlock()
devErrMap.cache.Remove(deviceName)
}
55 changes: 55 additions & 0 deletions pkg/gce-pd-csi-driver/device_error_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gceGCEDriver

import (
"testing"
"time"
)

func TestDeviceErrorMap(t *testing.T) {
timeout := time.Second * 10
dName := "fake-device"
eMap := newDeviceErrMap(timeout)
defer func() { currentTime = time.Now }()

// Register an error. Checking the timeout right after should return false
stubCurrentTime(0)
eMap.markDeviceError(dName)
isTimedOut := eMap.deviceErrorExpired(dName)
if isTimedOut {
t.Errorf("checkDeviceErrorTimeout expected to be false if called immediately after marking an error")
}

// Advance time. Checking the timeout should now return true
stubCurrentTime(int64(timeout.Seconds()) + 1)
isTimedOut = eMap.deviceErrorExpired(dName)
if !isTimedOut {
t.Errorf("checkDeviceErrorTimeout expected to be true after waiting for timeout")
}

// Deleting the device and checking the timout should return false
eMap.deleteDevice(dName)
isTimedOut = eMap.deviceErrorExpired(dName)
if isTimedOut {
t.Errorf("checkDeviceErrorTimeout expected to be false after deleting device from map")
}
}

func stubCurrentTime(unixTime int64) {
currentTime = func() time.Time {
return time.Unix(unixTime, 0)
}
}
16 changes: 9 additions & 7 deletions pkg/gce-pd-csi-driver/gce-pd-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,16 @@ func NewIdentityServer(gceDriver *GCEDriver) *GCEIdentityServer {
}
}

func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter) *GCENodeServer {
func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter, args NodeServerArgs) *GCENodeServer {
return &GCENodeServer{
Driver: gceDriver,
Mounter: mounter,
DeviceUtils: deviceUtils,
MetadataService: meta,
volumeLocks: common.NewVolumeLocks(),
VolumeStatter: statter,
Driver: gceDriver,
Mounter: mounter,
DeviceUtils: deviceUtils,
MetadataService: meta,
volumeLocks: common.NewVolumeLocks(),
VolumeStatter: statter,
enableDeviceInUseCheck: args.EnableDeviceInUseCheck,
deviceInUseErrors: newDeviceErrMap(args.DeviceInUseTimeout),
}
}

Expand Down
34 changes: 28 additions & 6 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type GCENodeServer struct {
// for that same volume (as defined by VolumeID) return an Aborted error
volumeLocks *common.VolumeLocks

// enableDeviceInUseCheck, if true, will block NodeUnstageVolume requests if the specified
// device is still in use (or until --device-in-use-timeout is reached, if specified)
enableDeviceInUseCheck bool
// deviceInUseErrors keeps tracks of device names and a timestamp for when an error is
// encounted for that device
deviceInUseErrors *deviceErrMap

// If set, this semaphore will be used to serialize formatAndMount. It will be raised
// when the operation starts, and lowered either when finished, or when
// formatAndMountTimeout has expired.
Expand All @@ -65,6 +72,14 @@ type GCENodeServer struct {
csi.UnimplementedNodeServer
}

type NodeServerArgs struct {
// EnableDeviceInUseCheck enables functionality which will block NodeUnstageVolume request
// until the device is not in use
EnableDeviceInUseCheck bool

DeviceInUseTimeout time.Duration
}

var _ csi.NodeServer = &GCENodeServer{}

// The constants are used to map from the machine type to the limit of
Expand Down Expand Up @@ -94,6 +109,7 @@ func getDefaultFsType() string {
return defaultLinuxFsType
}
}

func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
notMnt, err := ns.Mounter.Interface.IsLikelyNotMountPoint(path)
klog.V(4).Infof("Checking volume path %s is mounted %t: error %v", path, !notMnt, err)
Expand Down Expand Up @@ -455,13 +471,19 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
return nil, status.Error(codes.Internal, fmt.Sprintf("NodeUnstageVolume failed: %v\nUnmounting arguments: %s\n", err.Error(), stagingTargetPath))
}

if err := ns.confirmDeviceUnused(volumeID); err != nil {
var targetErr *ignoreableError
if errors.As(err, &targetErr) {
klog.Warningf("Unabled to check if device for %s is unused. Device has been unmounted successfully. Ignoring and continuing with unstaging. (%v)", volumeID, err)
} else {
return nil, status.Errorf(codes.Internal, "NodeUnstageVolume for volume %s failed: %v", volumeID, err)
if ns.enableDeviceInUseCheck {
if err := ns.confirmDeviceUnused(volumeID); err != nil {
var ignoreableErr *ignoreableError
if errors.As(err, &ignoreableErr) {
klog.Warningf("Unable to check if device for %s is unused. Device has been unmounted successfully. Ignoring and continuing with unstaging. (%v)", volumeID, err)
} else if ns.deviceInUseErrors.deviceErrorExpired(volumeID) {
klog.Warningf("Device %s could not be released after timeout of %f seconds. NodeUnstageVolume will return success.", volumeID, ns.deviceInUseErrors.timeout.Seconds())
} else {
ns.deviceInUseErrors.markDeviceError(volumeID)
return nil, status.Errorf(codes.Internal, "NodeUnstageVolume for volume %s failed: %v", volumeID, err)
}
}
ns.deviceInUseErrors.deleteDevice(volumeID)
}

klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
Expand Down
6 changes: 3 additions & 3 deletions pkg/gce-pd-csi-driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAn

func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService) *GCEDriver {
gceDriver := GetGCEDriver()
nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter))
nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0})
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
if err != nil {
t.Fatalf("Failed to setup GCE Driver: %v", err)
Expand All @@ -62,7 +62,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter))
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0})
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
if err != nil {
t.Fatalf("Failed to setup GCE Driver: %v", err)
Expand All @@ -73,7 +73,7 @@ func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct
func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5*time.Second, 1)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0}).WithSerializedFormatAndMount(5*time.Second, 1)

err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer)
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions test/e2e/tests/single_zone_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,82 @@ var _ = Describe("GCE PD CSI Driver", func() {
}()
})

It("Should unpublish if there is an error unpublishing and device has been in use longer than timeout", func() {
testContext := getRandomTestContext()

p, z, _ := testContext.Instance.GetIdentity()
client := testContext.Client
instance := testContext.Instance

// Create Disk
volName, volID := createAndValidateUniqueZonalDisk(client, p, z, standardDiskType)

defer func() {
// Delete Disk
err := client.DeleteVolume(volID)
Expect(err).To(BeNil(), "DeleteVolume failed")

// Validate Disk Deleted
_, err = computeService.Disks.Get(p, z, volName).Do()
Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found")
}()

// Attach Disk
err := client.ControllerPublishVolumeReadWrite(volID, instance.GetNodeID(), false /* forceAttach */)
Expect(err).To(BeNil(), "ControllerPublishVolume failed with error for disk %v on node %v: %v", volID, instance.GetNodeID(), err)

defer func() {
// Detach Disk
err = client.ControllerUnpublishVolume(volID, instance.GetNodeID())
if err != nil {
klog.Errorf("Failed to detach disk: %v", err)
}
}()

// Stage Disk
stageDir := filepath.Join("/tmp/", volName, "stage")
err = client.NodeStageExt4Volume(volID, stageDir)
Expect(err).To(BeNil(), "failed to stage volume: %v", err)

// Create private bind mount to keep the device's state as "in use"
boundMountStageDir := filepath.Join("/tmp/bindmount", volName, "bindmount")
boundMountStageMkdirOutput, err := instance.SSH("mkdir", "-p", boundMountStageDir)
Expect(err).To(BeNil(), "mkdir failed on instance %v: output: %v: %v", instance.GetNodeID(), boundMountStageMkdirOutput, err)
bindMountOutput, err := instance.SSH("mount", "--rbind", "--make-private", stageDir, boundMountStageDir)
Expect(err).To(BeNil(), "Bind mount failed on instance %v: output: %v: %v", instance.GetNodeID(), bindMountOutput, err)

privateBindMountRemoved := false
unmountAndRmPrivateBindMount := func() {
if !privateBindMountRemoved {
// Umount and delete private mount staging directory
bindUmountOutput, err := instance.SSH("umount", boundMountStageDir)
Expect(err).To(BeNil(), "Bind mount failed on instance %v: output: %v: %v", instance.GetNodeID(), bindUmountOutput, err)
err = testutils.RmAll(instance, boundMountStageDir)
Expect(err).To(BeNil(), "Failed to rm mount stage dir %s: %v", boundMountStageDir, err)
}
privateBindMountRemoved = true
}

defer func() {
unmountAndRmPrivateBindMount()
}()

// Unstage Disk. This should record a "deviceInUse" error
err = client.NodeUnstageVolume(volID, stageDir)
Expect(err).ToNot(BeNil(), "Expected failure during unstage")
Expect(err).To(MatchError(ContainSubstring(("is still in use"))))

// Wait 12s (10s timeout specified in CLI + 2s buffer) and try again
time.Sleep(12 * time.Second)
err = client.NodeUnstageVolume(volID, stageDir)
Expect(err).To(BeNil(), "Failed to unpublish after 10s in-use timeout for volume: %s, stageDir: %s, unexpected err: %s", volID, stageDir, err)

// Unstage Disk
fp := filepath.Join("/tmp/", volName)
err = testutils.RmAll(instance, fp)
Expect(err).To(BeNil(), "Failed to rm file path %s: %v", fp, err)
})

It("Should block unstage if filesystem mounted", func() {
testContext := getRandomTestContext()

Expand Down
1 change: 1 addition & 0 deletions test/e2e/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, driverConfig DriverC
"--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme",
"--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml",
"--allow-hdha-provisioning",
"--device-in-use-timeout=10s", // Set lower than the usual value to expedite tests
fmt.Sprintf("--fallback-requisite-zones=%s", strings.Join(driverConfig.Zones, ",")),
}
extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint=%s", driverConfig.ComputeEndpoint))
Expand Down
2 changes: 1 addition & 1 deletion test/sanity/sanity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestSanity(t *testing.T) {
identityServer := driver.NewIdentityServer(gceDriver)
controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true)
fakeStatter := mountmanager.NewFakeStatterWithOptions(mounter, mountmanager.FakeStatterOptions{IsBlock: false})
nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter)
nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, driver.NodeServerArgs{EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0})
err = gceDriver.SetupGCEDriver(driverName, vendorVersion, extraLabels, nil, identityServer, controllerServer, nodeServer)
if err != nil {
t.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())
Expand Down
Loading