Skip to content

Commit d0479f6

Browse files
committed
Add logic to force release volume in NodeUnstageVolume if it has not been released due to an error for more than 30 seconds
1 parent 27bb056 commit d0479f6

File tree

2 files changed

+113
-0
lines changed

2 files changed

+113
-0
lines changed

pkg/gce-pd-csi-driver/node.go

+37
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ type GCENodeServer struct {
5151
// for that same volume (as defined by VolumeID) return an Aborted error
5252
volumeLocks *common.VolumeLocks
5353

54+
// deviceInUseErrors keeps tracks of device names and a timestamp for when an error is
55+
// encounted for that device
56+
deviceInUseErrors map[string]time.Time
57+
5458
// If set, this semaphore will be used to serialize formatAndMount. It will be raised
5559
// when the operation starts, and lowered either when finished, or when
5660
// formatAndMountTimeout has expired.
@@ -81,6 +85,8 @@ const (
8185
fsTypeExt3 = "ext3"
8286

8387
readAheadKBMountFlagRegexPattern = "^read_ahead_kb=(.+)$"
88+
89+
deviceInUseTimeout = 30
8490
)
8591

8692
var (
@@ -94,6 +100,33 @@ func getDefaultFsType() string {
94100
return defaultLinuxFsType
95101
}
96102
}
103+
104+
// checkDeviceErrorTimeout returns true an error was encounted for the specified deviceName,
105+
// where the error happened at least `deviceInUseTimeout` seconds ago.
106+
func (ns *GCENodeServer) checkDeviceErrorTimeout(deviceName string) bool {
107+
if ns.deviceInUseErrors == nil {
108+
ns.deviceInUseErrors = make(map[string]time.Time)
109+
}
110+
111+
lastErrTime, exists := ns.deviceInUseErrors[deviceName]
112+
return exists && time.Now().Sub(lastErrTime).Seconds() >= deviceInUseTimeout
113+
}
114+
115+
// markDeviceError updates the internal `deviceInUseErrors` map to denote an error was encounted
116+
// for the specified deviceName at the current time
117+
func (ns *GCENodeServer) markDeviceError(deviceName string) {
118+
if ns.deviceInUseErrors == nil {
119+
ns.deviceInUseErrors = make(map[string]time.Time)
120+
}
121+
122+
// If an earlier error has already been recorded, do not overwrite it
123+
if _, exists := ns.deviceInUseErrors[deviceName]; !exists {
124+
now := time.Now()
125+
klog.V(4).Infof("Recording in-use error for device %s at time %s", deviceName, now)
126+
ns.deviceInUseErrors[deviceName] = now
127+
}
128+
}
129+
97130
func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
98131
notMnt, err := ns.Mounter.Interface.IsLikelyNotMountPoint(path)
99132
klog.V(4).Infof("Checking volume path %s is mounted %t: error %v", path, !notMnt, err)
@@ -459,11 +492,15 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
459492
var targetErr *ignoreableError
460493
if errors.As(err, &targetErr) {
461494
klog.Warningf("Unabled to check if device for %s is unused. Device has been unmounted successfully. Ignoring and continuing with unstaging. (%v)", volumeID, err)
495+
} else if ns.checkDeviceErrorTimeout(volumeID) {
496+
klog.Warningf("Device %s could not be released after timeout of %d seconds. NodeUnstageVolume will return success.", volumeID, deviceInUseTimeout)
462497
} else {
498+
ns.markDeviceError(volumeID)
463499
return nil, status.Errorf(codes.Internal, "NodeUnstageVolume for volume %s failed: %v", volumeID, err)
464500
}
465501
}
466502

503+
delete(ns.deviceInUseErrors, volumeID)
467504
klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
468505
return &csi.NodeUnstageVolumeResponse{}, nil
469506
}

test/e2e/tests/single_zone_e2e_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -1438,6 +1438,82 @@ var _ = Describe("GCE PD CSI Driver", func() {
14381438
}()
14391439
})
14401440

1441+
It("Should unpublish if there is an error unpublishing and device has been in use longer than timeout", func() {
1442+
testContext := getRandomTestContext()
1443+
1444+
p, z, _ := testContext.Instance.GetIdentity()
1445+
client := testContext.Client
1446+
instance := testContext.Instance
1447+
1448+
// Create Disk
1449+
volName, volID := createAndValidateUniqueZonalDisk(client, p, z, standardDiskType)
1450+
1451+
defer func() {
1452+
// Delete Disk
1453+
err := client.DeleteVolume(volID)
1454+
Expect(err).To(BeNil(), "DeleteVolume failed")
1455+
1456+
// Validate Disk Deleted
1457+
_, err = computeService.Disks.Get(p, z, volName).Do()
1458+
Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found")
1459+
}()
1460+
1461+
// Attach Disk
1462+
err := client.ControllerPublishVolumeReadWrite(volID, instance.GetNodeID(), false /* forceAttach */)
1463+
Expect(err).To(BeNil(), "ControllerPublishVolume failed with error for disk %v on node %v: %v", volID, instance.GetNodeID(), err)
1464+
1465+
defer func() {
1466+
// Detach Disk
1467+
err = client.ControllerUnpublishVolume(volID, instance.GetNodeID())
1468+
if err != nil {
1469+
klog.Errorf("Failed to detach disk: %v", err)
1470+
}
1471+
}()
1472+
1473+
// Stage Disk
1474+
stageDir := filepath.Join("/tmp/", volName, "stage")
1475+
err = client.NodeStageExt4Volume(volID, stageDir)
1476+
Expect(err).To(BeNil(), "failed to stage volume: %v", err)
1477+
1478+
// Create private bind mount to keep the device's state as "in use"
1479+
boundMountStageDir := filepath.Join("/tmp/bindmount", volName, "bindmount")
1480+
boundMountStageMkdirOutput, err := instance.SSH("mkdir", "-p", boundMountStageDir)
1481+
Expect(err).To(BeNil(), "mkdir failed on instance %v: output: %v: %v", instance.GetNodeID(), boundMountStageMkdirOutput, err)
1482+
bindMountOutput, err := instance.SSH("mount", "--rbind", "--make-private", stageDir, boundMountStageDir)
1483+
Expect(err).To(BeNil(), "Bind mount failed on instance %v: output: %v: %v", instance.GetNodeID(), bindMountOutput, err)
1484+
1485+
privateBindMountRemoved := false
1486+
unmountAndRmPrivateBindMount := func() {
1487+
if !privateBindMountRemoved {
1488+
// Umount and delete private mount staging directory
1489+
bindUmountOutput, err := instance.SSH("umount", boundMountStageDir)
1490+
Expect(err).To(BeNil(), "Bind mount failed on instance %v: output: %v: %v", instance.GetNodeID(), bindUmountOutput, err)
1491+
err = testutils.RmAll(instance, boundMountStageDir)
1492+
Expect(err).To(BeNil(), "Failed to rm mount stage dir %s: %v", boundMountStageDir, err)
1493+
}
1494+
privateBindMountRemoved = true
1495+
}
1496+
1497+
defer func() {
1498+
unmountAndRmPrivateBindMount()
1499+
}()
1500+
1501+
// Unstage Disk. This should record a "deviceInUse" error
1502+
err = client.NodeUnstageVolume(volID, stageDir)
1503+
Expect(err).ToNot(BeNil(), "Expected failure during unstage")
1504+
Expect(err).To(MatchError(ContainSubstring(("is still in use"))))
1505+
1506+
// Wait 35s (30s timeout + 5s buffer) and try again
1507+
time.Sleep(35 * time.Second)
1508+
err = client.NodeUnstageVolume(volID, stageDir)
1509+
Expect(err).To(BeNil(), "Failed to unpublish after 30s in-use timeout for volume: %s, stageDir: %s", volID, stageDir)
1510+
1511+
// Unstage Disk
1512+
fp := filepath.Join("/tmp/", volName)
1513+
err = testutils.RmAll(instance, fp)
1514+
Expect(err).To(BeNil(), "Failed to rm file path %s: %v", fp, err)
1515+
})
1516+
14411517
It("Should block unstage if filesystem mounted", func() {
14421518
testContext := getRandomTestContext()
14431519

0 commit comments

Comments
 (0)