From c76e7be9949dfebd68c85e1b20f32e3fc3a593c6 Mon Sep 17 00:00:00 2001 From: Peter Schuurman Date: Sun, 3 Mar 2024 09:47:50 -0800 Subject: [PATCH] Add support for read_ahead_kb mount flag --- pkg/gce-pd-csi-driver/node.go | 58 ++++++++ pkg/gce-pd-csi-driver/node_test.go | 185 +++++++++++++++++++++++-- pkg/gce-pd-csi-driver/utils.go | 5 + pkg/gce-pd-csi-driver/utils_linux.go | 21 +++ pkg/gce-pd-csi-driver/utils_windows.go | 5 + pkg/mount-manager/statter_linux.go | 24 +++- test/e2e/tests/single_zone_e2e_test.go | 84 +++++++++++ test/sanity/sanity_test.go | 3 +- 8 files changed, 365 insertions(+), 20 deletions(-) diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 21988b6e8..f601f9933 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -19,7 +19,9 @@ import ( "fmt" "os" "path/filepath" + "regexp" "runtime" + "strconv" "time" "google.golang.org/grpc/codes" @@ -72,6 +74,12 @@ const ( defaultLinuxFsType = "ext4" defaultWindowsFsType = "ntfs" fsTypeExt3 = "ext3" + + readAheadKBMountFlagRegexPattern = "^read_ahead_kb=(.+)$" +) + +var ( + readAheadKBMountFlagRegex = regexp.MustCompile(readAheadKBMountFlagRegexPattern) ) func getDefaultFsType() string { @@ -318,12 +326,19 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage // Part 3: Mount device to stagingTargetPath fstype := getDefaultFsType() + shouldUpdateReadAhead := false + var readAheadKB int64 options := []string{} if mnt := volumeCapability.GetMount(); mnt != nil { if mnt.FsType != "" { fstype = mnt.FsType } options = collectMountOptions(fstype, mnt.MountFlags) + + readAheadKB, shouldUpdateReadAhead, err = extractReadAheadKBMountFlag(mnt.MountFlags) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failure parsing mount flags: %v", err.Error()) + } } else if blk := volumeCapability.GetBlock(); blk != nil { // Noop for Block NodeStageVolume klog.V(4).Infof("NodeStageVolume succeeded on %v to %s, capability is block so this is a no-op", volumeID, stagingTargetPath) @@ -368,10 +383,53 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage } } + // Part 5: Update read_ahead + if shouldUpdateReadAhead { + if err := ns.updateReadAhead(devicePath, readAheadKB); err != nil { + return nil, status.Errorf(codes.Internal, "failure updating readahead for %s to %dKB: %v", devicePath, readAheadKB, err.Error()) + } + } + klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil } +func (ns *GCENodeServer) updateReadAhead(devicePath string, readAheadKB int64) error { + isBlock, err := ns.VolumeStatter.IsBlockDevice(devicePath) + if err != nil { + return fmt.Errorf("failed to determine whether %s is a block device: %v", devicePath, err) + } + if !isBlock { + return nil + } + + if err := setReadAheadKB(devicePath, readAheadKB, ns.Mounter); err != nil { + return fmt.Errorf("failed to set readahead: %v", err) + } + + return nil +} + +func extractReadAheadKBMountFlag(mountFlags []string) (int64, bool, error) { + for _, mountFlag := range mountFlags { + if readAheadKB := readAheadKBMountFlagRegex.FindStringSubmatch(mountFlag); len(readAheadKB) == 2 { + // There is only one matching pattern in readAheadKBMountFlagRegex + // If found, it will be at index 1 + readAheadKBInt, err := strconv.ParseInt(readAheadKB[1], 10, 0) + if err != nil { + return -1, false, fmt.Errorf("invalid read_ahead_kb mount flag %q: %v", mountFlag, err) + } + if readAheadKBInt < 0 { + // Negative values can result in unintuitive values when setting read ahead + // (due to blockdev intepreting negative integers as large positive integers). + return -1, false, fmt.Errorf("invalid negative value for read_ahead_kb mount flag: %q", mountFlag) + } + return readAheadKBInt, true, nil + } + } + return -1, false, nil +} + func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { // Validate arguments volumeID := req.GetVolumeId() diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index ce774533b..a50c74645 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -28,6 +28,7 @@ import ( testingexec "k8s.io/utils/exec/testing" csi "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/google/go-cmp/cmp" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/mount-utils" @@ -109,21 +110,33 @@ func TestNodeGetVolumeStats(t *testing.T) { Readonly: false, VolumeCapability: stdVolCap, } + _, err = ns.NodePublishVolume(context.Background(), req) if err != nil { t.Fatalf("Failed to set up test by publishing default vol: %v", err) } testCases := []struct { - name string - volumeID string - volumePath string - expectErr bool + name string + volumeID string + volumePath string + expectedResp *csi.NodeGetVolumeStatsResponse + deviceCapacity int + expectErr bool }{ { - name: "normal", - volumeID: defaultVolumeID, - volumePath: targetPath, + name: "normal", + volumeID: defaultVolumeID, + volumePath: targetPath, + deviceCapacity: 300 * 1024 * 1024 * 1024, // 300 GB + expectedResp: &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Unit: csi.VolumeUsage_BYTES, + Total: 300 * 1024 * 1024 * 1024, // 300 GB, + }, + }, + }, }, { name: "no vol id", @@ -145,18 +158,38 @@ func TestNodeGetVolumeStats(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + actionList := []testingexec.FakeCommandAction{ + makeFakeCmd( + &testingexec.FakeCmd{ + CombinedOutputScript: []testingexec.FakeAction{ + func() ([]byte, []byte, error) { + return []byte(fmt.Sprintf("%d", tc.deviceCapacity)), nil, nil + }, + }, + }, + "blockdev", + strings.Split("--getsize64 /dev/disk/fake-path", " ")..., + ), + } + + mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList}) + gceDriver := getTestGCEDriverWithCustomMounter(t, mounter) + ns := gceDriver.ns req := &csi.NodeGetVolumeStatsRequest{ VolumeId: tc.volumeID, VolumePath: tc.volumePath, } - _, err := ns.NodeGetVolumeStats(context.Background(), req) + resp, err := ns.NodeGetVolumeStats(context.Background(), req) if err != nil && !tc.expectErr { t.Fatalf("Got unexpected err: %v", err) } if err == nil && tc.expectErr { t.Fatal("Did not get error but expected one") } + if diff := cmp.Diff(tc.expectedResp, resp); diff != "" { + t.Errorf("NodeGetVolumeStats(%s): -want, +got \n%s", req, diff) + } }) } } @@ -394,13 +427,17 @@ func TestNodeStageVolume(t *testing.T) { stagingPath := filepath.Join(tempDir, defaultStagingPath) testCases := []struct { - name string - req *csi.NodeStageVolumeRequest - deviceSize int - blockExtSize int - readonlyBit string - expResize bool - expErrCode codes.Code + name string + req *csi.NodeStageVolumeRequest + deviceSize int + blockExtSize int + readonlyBit string + expResize bool + expReadAheadUpdate bool + expReadAheadKB string + readAheadSectors string + sectorSizeInBytes int + expErrCode codes.Code }{ { name: "Valid request, no resize because block and filesystem sizes match", @@ -450,6 +487,54 @@ func TestNodeStageVolume(t *testing.T) { readonlyBit: "0", expResize: false, }, + { + name: "Valid request, update readahead", + req: &csi.NodeStageVolumeRequest{ + VolumeId: volumeID, + StagingTargetPath: stagingPath, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + MountFlags: []string{"read_ahead_kb=4096"}, + }, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + deviceSize: 5, + blockExtSize: 1, + readonlyBit: "0", + expResize: true, + expReadAheadUpdate: true, + readAheadSectors: "8192", + sectorSizeInBytes: 512, + }, + { + name: "Valid request, update readahead (different sectorsize)", + req: &csi.NodeStageVolumeRequest{ + VolumeId: volumeID, + StagingTargetPath: stagingPath, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + MountFlags: []string{"read_ahead_kb=4096"}, + }, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + deviceSize: 5, + blockExtSize: 1, + readonlyBit: "0", + expResize: true, + expReadAheadUpdate: true, + readAheadSectors: "4194304", + sectorSizeInBytes: 1, + }, { name: "Invalid request (Bad Access Mode)", req: &csi.NodeStageVolumeRequest{ @@ -493,10 +578,47 @@ func TestNodeStageVolume(t *testing.T) { }, expErrCode: codes.InvalidArgument, }, + { + name: "Invalid request (Invalid read_ahead_kb)", + req: &csi.NodeStageVolumeRequest{ + VolumeId: volumeID, + StagingTargetPath: stagingPath, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + MountFlags: []string{"read_ahead_kb=not_a_number"}, + }, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + expErrCode: codes.InvalidArgument, + }, + { + name: "Invalid request (negative read_ahead_kb)", + req: &csi.NodeStageVolumeRequest{ + VolumeId: volumeID, + StagingTargetPath: stagingPath, + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + MountFlags: []string{"read_ahead_kb=-4096"}, + }, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + expErrCode: codes.InvalidArgument, + }, } for _, tc := range testCases { t.Logf("Test case: %s", tc.name) resizeCalled := false + readAheadUpdateCalled := false actionList := []testingexec.FakeCommandAction{ makeFakeCmd( &testingexec.FakeCmd{ @@ -593,6 +715,33 @@ func TestNodeStageVolume(t *testing.T) { ), }...) } + if tc.expReadAheadUpdate { + actionList = append(actionList, []testingexec.FakeCommandAction{ + makeFakeCmd( + &testingexec.FakeCmd{ + CombinedOutputScript: []testingexec.FakeAction{ + func() ([]byte, []byte, error) { + return []byte(fmt.Sprintf("%d", tc.sectorSizeInBytes)), nil, nil + }, + }, + }, + "blockdev", + []string{"--getss", "/dev/disk/fake-path"}..., + ), + makeFakeCmd( + &testingexec.FakeCmd{ + CombinedOutputScript: []testingexec.FakeAction{ + func() (_ []byte, args []byte, _ error) { + readAheadUpdateCalled = true + return []byte{}, nil, nil + }, + }, + }, + "blockdev", + []string{"--setra", tc.readAheadSectors, "/dev/disk/fake-path"}..., + ), + }...) + } mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList}) gceDriver := getTestGCEDriverWithCustomMounter(t, mounter) ns := gceDriver.ns @@ -616,6 +765,12 @@ func TestNodeStageVolume(t *testing.T) { if tc.expResize == false && resizeCalled == true { t.Fatalf("Test called resize, but it was not expected.") } + if tc.expReadAheadUpdate == true && readAheadUpdateCalled == false { + t.Fatalf("Test did not update read ahead, but it was expected.") + } + if tc.expReadAheadUpdate == false && readAheadUpdateCalled == true { + t.Fatalf("Test updated read ahead, but it was not expected.") + } } } diff --git a/pkg/gce-pd-csi-driver/utils.go b/pkg/gce-pd-csi-driver/utils.go index a33d66cc3..8e14b272a 100644 --- a/pkg/gce-pd-csi-driver/utils.go +++ b/pkg/gce-pd-csi-driver/utils.go @@ -257,6 +257,11 @@ func collectMountOptions(fsType string, mntFlags []string) []string { var options []string for _, opt := range mntFlags { + if readAheadKBMountFlagRegex.FindString(opt) != "" { + // The read_ahead_kb flag is a special flag that isn't + // passed directly as an option to the mount command. + continue + } options = append(options, opt) } diff --git a/pkg/gce-pd-csi-driver/utils_linux.go b/pkg/gce-pd-csi-driver/utils_linux.go index 805fb6e8a..a8ce16dad 100644 --- a/pkg/gce-pd-csi-driver/utils_linux.go +++ b/pkg/gce-pd-csi-driver/utils_linux.go @@ -106,3 +106,24 @@ func getBlockSizeBytes(devicePath string, m *mount.SafeFormatAndMount) (int64, e } return gotSizeBytes, nil } + +func setReadAheadKB(devicePath string, readAheadKB int64, m *mount.SafeFormatAndMount) error { + output, err := m.Exec.Command("blockdev", "--getss", devicePath).CombinedOutput() + if err != nil { + return fmt.Errorf("error when reading sector size at path %s: output: %s, err: %w", devicePath, string(output), err) + } + strOut := strings.TrimSpace(string(output)) + sectorSizeBytes, err := strconv.ParseInt(strOut, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse %q into an int size", strOut) + } + readAheadInSectors := readAheadKB * 1024 / sectorSizeBytes + readAheadInSectorsStr := strconv.FormatInt(readAheadInSectors, 10) + // Empirical testing indicates that the actual read_ahead_kb size that is set is rounded to the + // nearest 4KB. + output, err = m.Exec.Command("blockdev", "--setra", readAheadInSectorsStr, devicePath).CombinedOutput() + if err != nil { + return fmt.Errorf("error when setting readahead at path %s: output: %s, err: %w", devicePath, string(output), err) + } + return nil +} diff --git a/pkg/gce-pd-csi-driver/utils_windows.go b/pkg/gce-pd-csi-driver/utils_windows.go index b12f4f826..f40639d40 100644 --- a/pkg/gce-pd-csi-driver/utils_windows.go +++ b/pkg/gce-pd-csi-driver/utils_windows.go @@ -105,3 +105,8 @@ func getBlockSizeBytes(devicePath string, m *mount.SafeFormatAndMount) (int64, e } return proxy.GetDiskTotalBytes(devicePath) } + +func setReadAheadKB(devicePath string, readAheadKB int64, m *mount.SafeFormatAndMount) error { + // This is a no-op on windows. + return nil +} diff --git a/pkg/mount-manager/statter_linux.go b/pkg/mount-manager/statter_linux.go index d75c39b32..921ffdb98 100644 --- a/pkg/mount-manager/statter_linux.go +++ b/pkg/mount-manager/statter_linux.go @@ -61,10 +61,26 @@ func (*realStatter) StatFS(path string) (available, capacity, used, inodesFree, return } -type fakeStatter struct{} +type fakeStatter struct { + options FakeStatterOptions +} + +type FakeStatterOptions struct { + IsBlock bool +} func NewFakeStatter(mounter *mount.SafeFormatAndMount) *fakeStatter { - return &fakeStatter{} + return &fakeStatter{ + options: FakeStatterOptions{ + IsBlock: true, + }, + } +} + +func NewFakeStatterWithOptions(mounter *mount.SafeFormatAndMount, options FakeStatterOptions) *fakeStatter { + return &fakeStatter{ + options: options, + } } func (*fakeStatter) StatFS(path string) (available, capacity, used, inodesFree, inodes, inodesUsed int64, err error) { @@ -72,6 +88,6 @@ func (*fakeStatter) StatFS(path string) (available, capacity, used, inodesFree, return 1, 1, 1, 1, 1, 1, nil } -func (*fakeStatter) IsBlockDevice(fullPath string) (bool, error) { - return false, nil +func (fs *fakeStatter) IsBlockDevice(fullPath string) (bool, error) { + return fs.options.IsBlock, nil } diff --git a/test/e2e/tests/single_zone_e2e_test.go b/test/e2e/tests/single_zone_e2e_test.go index b1fa58272..e2184a7a7 100644 --- a/test/e2e/tests/single_zone_e2e_test.go +++ b/test/e2e/tests/single_zone_e2e_test.go @@ -1317,6 +1317,90 @@ var _ = Describe("GCE PD CSI Driver", func() { Expect(err).To(BeNil(), "no error expected when passed valid compute url") }) + It("Should update readahead if read_ahead_kb passed on mount", func() { + testContext := getRandomTestContext() + + p, z, _ := testContext.Instance.GetIdentity() + client := testContext.Client + instance := testContext.Instance + + // Create Disk + volName, volID := createAndValidateUniqueZonalDisk(client, p, z, standardDiskType) + + defer func() { + // Delete Disk + err := client.DeleteVolume(volID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, z, volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found") + }() + + // Attach Disk + err := client.ControllerPublishVolumeReadWrite(volID, instance.GetNodeID(), false /* forceAttach */) + Expect(err).To(BeNil(), "ControllerPublishVolume failed with error for disk %v on node %v: %v", volID, instance.GetNodeID(), err) + + defer func() { + // Detach Disk + err = client.ControllerUnpublishVolume(volID, instance.GetNodeID()) + if err != nil { + klog.Errorf("Failed to detach disk: %v", err) + } + + }() + + // Stage Disk + stageDir := filepath.Join("/tmp/", volName, "stage") + expectedReadAheadKB := "4096" + volCap := &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + MountFlags: []string{fmt.Sprintf("read_ahead_kb=%s", expectedReadAheadKB)}, + }, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + } + err = client.NodeStageVolume(volID, stageDir, volCap) + Expect(err).To(BeNil(), "failed to stage volume: %v", err) + + // Validate that the link is correct + var validated bool + var devName string + devicePaths := deviceutils.NewDeviceUtils().GetDiskByIdPaths(volName, "") + for _, devicePath := range devicePaths { + validated, err = testutils.ValidateLogicalLinkIsDisk(instance, devicePath, volName) + Expect(err).To(BeNil(), "failed to validate link %s is disk %s: %v", stageDir, volName, err) + if validated { + devFsPath, err := instance.SSH("find", devicePath, "-printf", "'%l'") + Expect(err).To(BeNil(), "Failed to symlink devicePath") + devFsPathPieces := strings.Split(devFsPath, "/") + devName = devFsPathPieces[len(devFsPathPieces)-1] + + } + } + Expect(validated).To(BeTrue(), "could not find device in %v that links to volume %s", devicePaths, volName) + actualReadAheadKBStr, err := instance.SSH("cat", fmt.Sprintf("/sys/block/%s/queue/read_ahead_kb", devName)) + actualReadAheadKB := strings.TrimSpace(actualReadAheadKBStr) + Expect(err).To(BeNil(), "Failed to read read_ahead_kb: %v", err) + Expect(actualReadAheadKB).To(Equal(expectedReadAheadKB), "unexpected read_ahead_kb") + + defer func() { + // Unstage Disk + err = client.NodeUnstageVolume(volID, stageDir) + if err != nil { + klog.Errorf("Failed to unstage volume: %v", err) + } + fp := filepath.Join("/tmp/", volName) + err = testutils.RmAll(instance, fp) + if err != nil { + klog.Errorf("Failed to rm file path %s: %v", fp, err) + } + }() + }) + type multiZoneTestConfig struct { diskType string readOnly bool diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index c093125dc..380365357 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -71,7 +71,8 @@ func TestSanity(t *testing.T) { //Initialize GCE Driver identityServer := driver.NewIdentityServer(gceDriver) controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig) - nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)) + fakeStatter := mountmanager.NewFakeStatterWithOptions(mounter, mountmanager.FakeStatterOptions{IsBlock: false}) + nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter) err = gceDriver.SetupGCEDriver(driverName, vendorVersion, extraLabels, identityServer, controllerServer, nodeServer) if err != nil { t.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())