Skip to content

Commit 9455633

Browse files
authored
Merge pull request #1313 from mattcary/cp-1310
Cherrypick #1294
2 parents 6fe2c01 + e38ee24 commit 9455633

File tree

6 files changed

+89
-13
lines changed

6 files changed

+89
-13
lines changed

cmd/gce-pd-csi-driver/main.go

+6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ var (
6363
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
6464
concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released")
6565

66+
maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations")
67+
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")
68+
6669
version string
6770
)
6871

@@ -153,6 +156,9 @@ func handle() {
153156
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
154157
}
155158
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter)
159+
if *maxConcurrentFormatAndMount > 0 {
160+
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
161+
}
156162
}
157163

158164
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, identityServer, controllerServer, nodeServer)

pkg/gce-pd-csi-driver/node.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"os"
2121
"path/filepath"
2222
"runtime"
23+
"time"
2324

2425
"google.golang.org/grpc/codes"
2526
"google.golang.org/grpc/status"
@@ -46,6 +47,15 @@ type GCENodeServer struct {
4647
// A map storing all volumes with ongoing operations so that additional operations
4748
// for that same volume (as defined by VolumeID) return an Aborted error
4849
volumeLocks *common.VolumeLocks
50+
51+
// If set, this semaphore will be used to serialize formatAndMount. It will be raised
52+
// when the operation starts, and lowered either when finished, or when
53+
// formatAndMountTimeout has expired.
54+
//
55+
// This is used only on linux (where memory problems for concurrent fsck and mkfs have
56+
// been observed).
57+
formatAndMountSemaphore chan any
58+
formatAndMountTimeout time.Duration
4959
}
5060

5161
var _ csi.NodeServer = &GCENodeServer{}
@@ -86,6 +96,14 @@ func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
8696
return false
8797
}
8898

99+
func (ns *GCENodeServer) WithSerializedFormatAndMount(timeout time.Duration, maxConcurrent int) *GCENodeServer {
100+
if maxConcurrent > 0 {
101+
ns.formatAndMountSemaphore = make(chan any, maxConcurrent)
102+
ns.formatAndMountTimeout = timeout
103+
}
104+
return ns
105+
}
106+
89107
func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
90108
// Validate Arguments
91109
targetPath := req.GetTargetPath()
@@ -318,7 +336,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
318336
klog.V(4).Infof("CSI volume is read-only, mounting with extra option ro")
319337
}
320338

321-
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
339+
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
322340
if err != nil {
323341
// If a volume is created from a content source like snapshot or cloning, the filesystem might get marked
324342
// as "dirty" even if it is otherwise consistent and ext3/4 will try to restore to a consistent state by replaying
@@ -329,7 +347,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
329347
klog.V(4).Infof("Failed to mount CSI volume read-only, retry mounting with extra option noload")
330348

331349
options = append(options, "noload")
332-
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
350+
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
333351
if err == nil {
334352
klog.V(4).Infof("NodeStageVolume succeeded with \"noload\" option on %v to %s", volumeID, stagingTargetPath)
335353
return &csi.NodeStageVolumeResponse{}, nil

pkg/gce-pd-csi-driver/node_test.go

+27-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"os"
2323
"path/filepath"
2424
"testing"
25+
"time"
2526

2627
"k8s.io/utils/exec"
2728
testingexec "k8s.io/utils/exec/testing"
@@ -57,7 +58,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
5758
return gceDriver
5859
}
5960

60-
func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
61+
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
6162
gceDriver := GetGCEDriver()
6263
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
6364
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter))
@@ -68,6 +69,18 @@ func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *
6869
return gceDriver
6970
}
7071

72+
func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
73+
gceDriver := GetGCEDriver()
74+
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
75+
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5*time.Second, 1)
76+
77+
err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nodeServer)
78+
if err != nil {
79+
t.Fatalf("Failed to setup GCE Driver: %v", err)
80+
}
81+
return gceDriver
82+
}
83+
7184
func makeFakeCmd(fakeCmd *testingexec.FakeCmd, cmd string, args ...string) testingexec.FakeCommandAction {
7285
c := cmd
7386
a := args
@@ -762,9 +775,7 @@ func TestNodeGetCapabilities(t *testing.T) {
762775
}
763776
}
764777

765-
func TestConcurrentNodeOperations(t *testing.T) {
766-
readyToExecute := make(chan chan struct{}, 1)
767-
gceDriver := getTestBlockingGCEDriver(t, readyToExecute)
778+
func runBlockingFormatAndMount(t *testing.T, gceDriver *GCEDriver, readyToExecute chan chan struct{}) {
768779
ns := gceDriver.ns
769780
tempDir, err := ioutil.TempDir("", "cno")
770781
if err != nil {
@@ -844,3 +855,15 @@ func TestConcurrentNodeOperations(t *testing.T) {
844855
t.Errorf("Unexpected error: %v", err)
845856
}
846857
}
858+
859+
func TestBlockingMount(t *testing.T) {
860+
readyToExecute := make(chan chan struct{}, 1)
861+
gceDriver := getTestBlockingMountGCEDriver(t, readyToExecute)
862+
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
863+
}
864+
865+
func TestBlockingFormatAndMount(t *testing.T) {
866+
readyToExecute := make(chan chan struct{}, 1)
867+
gceDriver := getTestBlockingFormatAndMountGCEDriver(t, readyToExecute)
868+
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
869+
}

pkg/gce-pd-csi-driver/utils_linux.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"os"
2121
"strconv"
2222
"strings"
23+
"time"
2324

2425
"google.golang.org/grpc/codes"
2526
"google.golang.org/grpc/status"
@@ -47,7 +48,33 @@ func getDevicePath(ns *GCENodeServer, volumeID, partition string) (string, error
4748
return devicePath, nil
4849
}
4950

50-
func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
51+
func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
52+
if ns.formatAndMountSemaphore != nil {
53+
done := make(chan any)
54+
defer close(done)
55+
56+
// Aquire the semaphore. This will block if another formatAndMount has put an item
57+
// into the semaphore channel.
58+
ns.formatAndMountSemaphore <- struct{}{}
59+
60+
go func() {
61+
defer func() { <-ns.formatAndMountSemaphore }()
62+
63+
// Add a timeout where so the semaphore will be released even if
64+
// formatAndMount is still working. This allows the node to make progress on
65+
// volumes if some error causes one formatAndMount to get stuck. The
66+
// motivation for this serialization is to reduce memory usage; if stuck
67+
// processes cause OOMs then the containers will be killed and restarted,
68+
// including the stuck threads and with any luck making progress.
69+
timeout := time.NewTimer(ns.formatAndMountTimeout)
70+
defer timeout.Stop()
71+
72+
select {
73+
case <-done:
74+
case <-timeout.C:
75+
}
76+
}()
77+
}
5178
return m.FormatAndMount(source, target, fstype, options)
5279
}
5380

pkg/gce-pd-csi-driver/utils_windows.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
mounter "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
2525
)
2626

27-
func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
27+
func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
2828
if !strings.EqualFold(fstype, defaultWindowsFsType) {
2929
return fmt.Errorf("GCE PD CSI driver can only supports %s file system, it does not support %s", defaultWindowsFsType, fstype)
3030
}

test/e2e/utils/utils.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,18 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, computeEndpoint stri
5353
binPath := path.Join(pkgPath, "bin/gce-pd-csi-driver")
5454

5555
endpoint := fmt.Sprintf("tcp://localhost:%s", port)
56-
computeFlag := ""
56+
extra_flags := []string{
57+
fmt.Sprintf("--extra-labels=%s=%s", DiskLabelKey, DiskLabelValue),
58+
"--max-concurrent-format-and-mount=10", // otherwise the serialization times out.
59+
}
5760
if computeEndpoint != "" {
58-
computeFlag = fmt.Sprintf("--compute-endpoint %s", computeEndpoint)
61+
extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint %s", computeEndpoint))
5962
}
60-
6163
workspace := remote.NewWorkspaceDir("gce-pd-e2e-")
6264
// Log at V(6) as the compute API calls are emitted at that level and it's
6365
// useful to see what's happening when debugging tests.
64-
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s --extra-labels=%s=%s 2> %s/prog.out < /dev/null > /dev/null &'",
65-
workspace, endpoint, computeFlag, DiskLabelKey, DiskLabelValue, workspace)
66+
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s 2> %s/prog.out < /dev/null > /dev/null &'",
67+
workspace, endpoint, strings.Join(extra_flags, " "), workspace)
6668

6769
config := &remote.ClientConfig{
6870
PkgPath: pkgPath,

0 commit comments

Comments
 (0)