From e38ee24f50b1f7f6e6a1f6f4cfa1fd2e8ca30664 Mon Sep 17 00:00:00 2001 From: Matthew Cary Date: Mon, 17 Jul 2023 17:52:53 -0700 Subject: [PATCH] Add option to serialize formatAndMount Change-Id: Ieff9b45a413c6df67d7199de6845132ff3b1cd2f --- cmd/gce-pd-csi-driver/main.go | 6 +++++ pkg/gce-pd-csi-driver/node.go | 22 ++++++++++++++++-- pkg/gce-pd-csi-driver/node_test.go | 31 ++++++++++++++++++++++---- pkg/gce-pd-csi-driver/utils_linux.go | 29 +++++++++++++++++++++++- pkg/gce-pd-csi-driver/utils_windows.go | 2 +- test/e2e/utils/utils.go | 12 +++++----- 6 files changed, 89 insertions(+), 13 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 0402ac646..479559afe 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -63,6 +63,9 @@ var ( maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls") concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released") + maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations") + formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount") + version string ) @@ -153,6 +156,9 @@ func handle() { klog.Fatalf("Failed to set up metadata service: %v", err.Error()) } nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter) + if *maxConcurrentFormatAndMount > 0 { + nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount) + } } err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, identityServer, controllerServer, nodeServer) diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 8349721c5..d9fe45fc6 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "runtime" + "time" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -46,6 +47,15 @@ type GCENodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error volumeLocks *common.VolumeLocks + + // If set, this semaphore will be used to serialize formatAndMount. It will be raised + // when the operation starts, and lowered either when finished, or when + // formatAndMountTimeout has expired. + // + // This is used only on linux (where memory problems for concurrent fsck and mkfs have + // been observed). + formatAndMountSemaphore chan any + formatAndMountTimeout time.Duration } var _ csi.NodeServer = &GCENodeServer{} @@ -86,6 +96,14 @@ func (ns *GCENodeServer) isVolumePathMounted(path string) bool { return false } +func (ns *GCENodeServer) WithSerializedFormatAndMount(timeout time.Duration, maxConcurrent int) *GCENodeServer { + if maxConcurrent > 0 { + ns.formatAndMountSemaphore = make(chan any, maxConcurrent) + ns.formatAndMountTimeout = timeout + } + return ns +} + func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { // Validate Arguments targetPath := req.GetTargetPath() @@ -318,7 +336,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage klog.V(4).Infof("CSI volume is read-only, mounting with extra option ro") } - err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter) + err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter) if err != nil { // If a volume is created from a content source like snapshot or cloning, the filesystem might get marked // as "dirty" even if it is otherwise consistent and ext3/4 will try to restore to a consistent state by replaying @@ -329,7 +347,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage klog.V(4).Infof("Failed to mount CSI volume read-only, retry mounting with extra option noload") options = append(options, "noload") - err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter) + err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter) if err == nil { klog.V(4).Infof("NodeStageVolume succeeded with \"noload\" option on %v to %s", volumeID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index 3125c0309..cba9f2407 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "testing" + "time" "k8s.io/utils/exec" testingexec "k8s.io/utils/exec/testing" @@ -57,7 +58,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev return gceDriver } -func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver { +func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver { gceDriver := GetGCEDriver() mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute) nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)) @@ -68,6 +69,18 @@ func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) * return gceDriver } +func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver { + gceDriver := GetGCEDriver() + mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute) + nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5*time.Second, 1) + + err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nodeServer) + if err != nil { + t.Fatalf("Failed to setup GCE Driver: %v", err) + } + return gceDriver +} + func makeFakeCmd(fakeCmd *testingexec.FakeCmd, cmd string, args ...string) testingexec.FakeCommandAction { c := cmd a := args @@ -762,9 +775,7 @@ func TestNodeGetCapabilities(t *testing.T) { } } -func TestConcurrentNodeOperations(t *testing.T) { - readyToExecute := make(chan chan struct{}, 1) - gceDriver := getTestBlockingGCEDriver(t, readyToExecute) +func runBlockingFormatAndMount(t *testing.T, gceDriver *GCEDriver, readyToExecute chan chan struct{}) { ns := gceDriver.ns tempDir, err := ioutil.TempDir("", "cno") if err != nil { @@ -844,3 +855,15 @@ func TestConcurrentNodeOperations(t *testing.T) { t.Errorf("Unexpected error: %v", err) } } + +func TestBlockingMount(t *testing.T) { + readyToExecute := make(chan chan struct{}, 1) + gceDriver := getTestBlockingMountGCEDriver(t, readyToExecute) + runBlockingFormatAndMount(t, gceDriver, readyToExecute) +} + +func TestBlockingFormatAndMount(t *testing.T) { + readyToExecute := make(chan chan struct{}, 1) + gceDriver := getTestBlockingFormatAndMountGCEDriver(t, readyToExecute) + runBlockingFormatAndMount(t, gceDriver, readyToExecute) +} diff --git a/pkg/gce-pd-csi-driver/utils_linux.go b/pkg/gce-pd-csi-driver/utils_linux.go index 8f1f03673..805fb6e8a 100644 --- a/pkg/gce-pd-csi-driver/utils_linux.go +++ b/pkg/gce-pd-csi-driver/utils_linux.go @@ -20,6 +20,7 @@ import ( "os" "strconv" "strings" + "time" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -47,7 +48,33 @@ func getDevicePath(ns *GCENodeServer, volumeID, partition string) (string, error return devicePath, nil } -func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error { +func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error { + if ns.formatAndMountSemaphore != nil { + done := make(chan any) + defer close(done) + + // Aquire the semaphore. This will block if another formatAndMount has put an item + // into the semaphore channel. + ns.formatAndMountSemaphore <- struct{}{} + + go func() { + defer func() { <-ns.formatAndMountSemaphore }() + + // Add a timeout where so the semaphore will be released even if + // formatAndMount is still working. This allows the node to make progress on + // volumes if some error causes one formatAndMount to get stuck. The + // motivation for this serialization is to reduce memory usage; if stuck + // processes cause OOMs then the containers will be killed and restarted, + // including the stuck threads and with any luck making progress. + timeout := time.NewTimer(ns.formatAndMountTimeout) + defer timeout.Stop() + + select { + case <-done: + case <-timeout.C: + } + }() + } return m.FormatAndMount(source, target, fstype, options) } diff --git a/pkg/gce-pd-csi-driver/utils_windows.go b/pkg/gce-pd-csi-driver/utils_windows.go index 4661cd89f..b12f4f826 100644 --- a/pkg/gce-pd-csi-driver/utils_windows.go +++ b/pkg/gce-pd-csi-driver/utils_windows.go @@ -24,7 +24,7 @@ import ( mounter "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) -func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error { +func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error { if !strings.EqualFold(fstype, defaultWindowsFsType) { return fmt.Errorf("GCE PD CSI driver can only supports %s file system, it does not support %s", defaultWindowsFsType, fstype) } diff --git a/test/e2e/utils/utils.go b/test/e2e/utils/utils.go index f57c09d9d..582fb88df 100644 --- a/test/e2e/utils/utils.go +++ b/test/e2e/utils/utils.go @@ -53,16 +53,18 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, computeEndpoint stri binPath := path.Join(pkgPath, "bin/gce-pd-csi-driver") endpoint := fmt.Sprintf("tcp://localhost:%s", port) - computeFlag := "" + extra_flags := []string{ + fmt.Sprintf("--extra-labels=%s=%s", DiskLabelKey, DiskLabelValue), + "--max-concurrent-format-and-mount=10", // otherwise the serialization times out. + } if computeEndpoint != "" { - computeFlag = fmt.Sprintf("--compute-endpoint %s", computeEndpoint) + extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint %s", computeEndpoint)) } - workspace := remote.NewWorkspaceDir("gce-pd-e2e-") // Log at V(6) as the compute API calls are emitted at that level and it's // useful to see what's happening when debugging tests. - driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s --extra-labels=%s=%s 2> %s/prog.out < /dev/null > /dev/null &'", - workspace, endpoint, computeFlag, DiskLabelKey, DiskLabelValue, workspace) + driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s 2> %s/prog.out < /dev/null > /dev/null &'", + workspace, endpoint, strings.Join(extra_flags, " "), workspace) config := &remote.ClientConfig{ PkgPath: pkgPath,