Skip to content

Commit 55ee1e2

Browse files
authored
Merge pull request #1294 from mattcary/fsck
Add option to serialize formatAndMount
2 parents d14b457 + e90a744 commit 55ee1e2

File tree

6 files changed

+89
-13
lines changed

6 files changed

+89
-13
lines changed

cmd/gce-pd-csi-driver/main.go

+6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ var (
6363
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
6464
concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released")
6565

66+
maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations")
67+
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")
68+
6669
version string
6770
)
6871

@@ -153,6 +156,9 @@ func handle() {
153156
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
154157
}
155158
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter)
159+
if *maxConcurrentFormatAndMount > 0 {
160+
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
161+
}
156162
}
157163

158164
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, identityServer, controllerServer, nodeServer)

pkg/gce-pd-csi-driver/node.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"os"
2121
"path/filepath"
2222
"runtime"
23+
"time"
2324

2425
"google.golang.org/grpc/codes"
2526
"google.golang.org/grpc/status"
@@ -46,6 +47,15 @@ type GCENodeServer struct {
4647
// A map storing all volumes with ongoing operations so that additional operations
4748
// for that same volume (as defined by VolumeID) return an Aborted error
4849
volumeLocks *common.VolumeLocks
50+
51+
// If set, this semaphore will be used to serialize formatAndMount. It will be raised
52+
// when the operation starts, and lowered either when finished, or when
53+
// formatAndMountTimeout has expired.
54+
//
55+
// This is used only on linux (where memory problems for concurrent fsck and mkfs have
56+
// been observed).
57+
formatAndMountSemaphore chan any
58+
formatAndMountTimeout time.Duration
4959
}
5060

5161
var _ csi.NodeServer = &GCENodeServer{}
@@ -86,6 +96,14 @@ func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
8696
return false
8797
}
8898

99+
func (ns *GCENodeServer) WithSerializedFormatAndMount(timeout time.Duration, maxConcurrent int) *GCENodeServer {
100+
if maxConcurrent > 0 {
101+
ns.formatAndMountSemaphore = make(chan any, maxConcurrent)
102+
ns.formatAndMountTimeout = timeout
103+
}
104+
return ns
105+
}
106+
89107
func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
90108
// Validate Arguments
91109
targetPath := req.GetTargetPath()
@@ -318,7 +336,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
318336
klog.V(4).Infof("CSI volume is read-only, mounting with extra option ro")
319337
}
320338

321-
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
339+
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
322340
if err != nil {
323341
// If a volume is created from a content source like snapshot or cloning, the filesystem might get marked
324342
// as "dirty" even if it is otherwise consistent and ext3/4 will try to restore to a consistent state by replaying
@@ -329,7 +347,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
329347
klog.V(4).Infof("Failed to mount CSI volume read-only, retry mounting with extra option noload")
330348

331349
options = append(options, "noload")
332-
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
350+
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
333351
if err == nil {
334352
klog.V(4).Infof("NodeStageVolume succeeded with \"noload\" option on %v to %s", volumeID, stagingTargetPath)
335353
return &csi.NodeStageVolumeResponse{}, nil

pkg/gce-pd-csi-driver/node_test.go

+27-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"path/filepath"
2424
"strings"
2525
"testing"
26+
"time"
2627

2728
"k8s.io/utils/exec"
2829
testingexec "k8s.io/utils/exec/testing"
@@ -58,7 +59,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
5859
return gceDriver
5960
}
6061

61-
func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
62+
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
6263
gceDriver := GetGCEDriver()
6364
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
6465
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter))
@@ -69,6 +70,18 @@ func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *
6970
return gceDriver
7071
}
7172

73+
func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
74+
gceDriver := GetGCEDriver()
75+
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
76+
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5*time.Second, 1)
77+
78+
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nodeServer)
79+
if err != nil {
80+
t.Fatalf("Failed to setup GCE Driver: %v", err)
81+
}
82+
return gceDriver
83+
}
84+
7285
func makeFakeCmd(fakeCmd *testingexec.FakeCmd, cmd string, args ...string) testingexec.FakeCommandAction {
7386
c := cmd
7487
a := args
@@ -849,9 +862,7 @@ func TestNodeGetCapabilities(t *testing.T) {
849862
}
850863
}
851864

852-
func TestConcurrentNodeOperations(t *testing.T) {
853-
readyToExecute := make(chan chan struct{}, 1)
854-
gceDriver := getTestBlockingGCEDriver(t, readyToExecute)
865+
func runBlockingFormatAndMount(t *testing.T, gceDriver *GCEDriver, readyToExecute chan chan struct{}) {
855866
ns := gceDriver.ns
856867
tempDir, err := ioutil.TempDir("", "cno")
857868
if err != nil {
@@ -931,3 +942,15 @@ func TestConcurrentNodeOperations(t *testing.T) {
931942
t.Errorf("Unexpected error: %v", err)
932943
}
933944
}
945+
946+
func TestBlockingMount(t *testing.T) {
947+
readyToExecute := make(chan chan struct{}, 1)
948+
gceDriver := getTestBlockingMountGCEDriver(t, readyToExecute)
949+
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
950+
}
951+
952+
func TestBlockingFormatAndMount(t *testing.T) {
953+
readyToExecute := make(chan chan struct{}, 1)
954+
gceDriver := getTestBlockingFormatAndMountGCEDriver(t, readyToExecute)
955+
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
956+
}

pkg/gce-pd-csi-driver/utils_linux.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"os"
2121
"strconv"
2222
"strings"
23+
"time"
2324

2425
"google.golang.org/grpc/codes"
2526
"google.golang.org/grpc/status"
@@ -47,7 +48,33 @@ func getDevicePath(ns *GCENodeServer, volumeID, partition string) (string, error
4748
return devicePath, nil
4849
}
4950

50-
func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
51+
func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
52+
if ns.formatAndMountSemaphore != nil {
53+
done := make(chan any)
54+
defer close(done)
55+
56+
// Aquire the semaphore. This will block if another formatAndMount has put an item
57+
// into the semaphore channel.
58+
ns.formatAndMountSemaphore <- struct{}{}
59+
60+
go func() {
61+
defer func() { <-ns.formatAndMountSemaphore }()
62+
63+
// Add a timeout where so the semaphore will be released even if
64+
// formatAndMount is still working. This allows the node to make progress on
65+
// volumes if some error causes one formatAndMount to get stuck. The
66+
// motivation for this serialization is to reduce memory usage; if stuck
67+
// processes cause OOMs then the containers will be killed and restarted,
68+
// including the stuck threads and with any luck making progress.
69+
timeout := time.NewTimer(ns.formatAndMountTimeout)
70+
defer timeout.Stop()
71+
72+
select {
73+
case <-done:
74+
case <-timeout.C:
75+
}
76+
}()
77+
}
5178
return m.FormatAndMount(source, target, fstype, options)
5279
}
5380

pkg/gce-pd-csi-driver/utils_windows.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
mounter "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
2525
)
2626

27-
func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
27+
func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
2828
if !strings.EqualFold(fstype, defaultWindowsFsType) {
2929
return fmt.Errorf("GCE PD CSI driver can only supports %s file system, it does not support %s", defaultWindowsFsType, fstype)
3030
}

test/e2e/utils/utils.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,18 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, computeEndpoint stri
5353
binPath := path.Join(pkgPath, "bin/gce-pd-csi-driver")
5454

5555
endpoint := fmt.Sprintf("tcp://localhost:%s", port)
56-
computeFlag := ""
56+
extra_flags := []string{
57+
fmt.Sprintf("--extra-labels=%s=%s", DiskLabelKey, DiskLabelValue),
58+
"--max-concurrent-format-and-mount=10", // otherwise the serialization times out.
59+
}
5760
if computeEndpoint != "" {
58-
computeFlag = fmt.Sprintf("--compute-endpoint %s", computeEndpoint)
61+
extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint %s", computeEndpoint))
5962
}
60-
6163
workspace := remote.NewWorkspaceDir("gce-pd-e2e-")
6264
// Log at V(6) as the compute API calls are emitted at that level and it's
6365
// useful to see what's happening when debugging tests.
64-
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s --extra-labels=%s=%s 2> %s/prog.out < /dev/null > /dev/null &'",
65-
workspace, endpoint, computeFlag, DiskLabelKey, DiskLabelValue, workspace)
66+
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s 2> %s/prog.out < /dev/null > /dev/null &'",
67+
workspace, endpoint, strings.Join(extra_flags, " "), workspace)
6668

6769
config := &remote.ClientConfig{
6870
PkgPath: pkgPath,

0 commit comments

Comments
 (0)