Skip to content

[release-1.13] Add support for read_ahead_kb mount flag #1631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"time"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -72,6 +74,12 @@ const (
defaultLinuxFsType = "ext4"
defaultWindowsFsType = "ntfs"
fsTypeExt3 = "ext3"

readAheadKBMountFlagRegexPattern = "^read_ahead_kb=(.+)$"
)

var (
readAheadKBMountFlagRegex = regexp.MustCompile(readAheadKBMountFlagRegexPattern)
)

func getDefaultFsType() string {
Expand Down Expand Up @@ -318,12 +326,19 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
// Part 3: Mount device to stagingTargetPath
fstype := getDefaultFsType()

shouldUpdateReadAhead := false
var readAheadKB int64
options := []string{}
if mnt := volumeCapability.GetMount(); mnt != nil {
if mnt.FsType != "" {
fstype = mnt.FsType
}
options = collectMountOptions(fstype, mnt.MountFlags)

readAheadKB, shouldUpdateReadAhead, err = extractReadAheadKBMountFlag(mnt.MountFlags)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failure parsing mount flags: %v", err.Error())
}
} else if blk := volumeCapability.GetBlock(); blk != nil {
// Noop for Block NodeStageVolume
klog.V(4).Infof("NodeStageVolume succeeded on %v to %s, capability is block so this is a no-op", volumeID, stagingTargetPath)
Expand Down Expand Up @@ -368,10 +383,53 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
}
}

// Part 5: Update read_ahead
if shouldUpdateReadAhead {
if err := ns.updateReadAhead(devicePath, readAheadKB); err != nil {
return nil, status.Errorf(codes.Internal, "failure updating readahead for %s to %dKB: %v", devicePath, readAheadKB, err.Error())
}
}

klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}

func (ns *GCENodeServer) updateReadAhead(devicePath string, readAheadKB int64) error {
isBlock, err := ns.VolumeStatter.IsBlockDevice(devicePath)
if err != nil {
return fmt.Errorf("failed to determine whether %s is a block device: %v", devicePath, err)
}
if !isBlock {
return nil
}

if err := setReadAheadKB(devicePath, readAheadKB, ns.Mounter); err != nil {
return fmt.Errorf("failed to set readahead: %v", err)
}

return nil
}

func extractReadAheadKBMountFlag(mountFlags []string) (int64, bool, error) {
for _, mountFlag := range mountFlags {
if readAheadKB := readAheadKBMountFlagRegex.FindStringSubmatch(mountFlag); len(readAheadKB) == 2 {
// There is only one matching pattern in readAheadKBMountFlagRegex
// If found, it will be at index 1
readAheadKBInt, err := strconv.ParseInt(readAheadKB[1], 10, 0)
if err != nil {
return -1, false, fmt.Errorf("invalid read_ahead_kb mount flag %q: %v", mountFlag, err)
}
if readAheadKBInt < 0 {
// Negative values can result in unintuitive values when setting read ahead
// (due to blockdev intepreting negative integers as large positive integers).
return -1, false, fmt.Errorf("invalid negative value for read_ahead_kb mount flag: %q", mountFlag)
}
return readAheadKBInt, true, nil
}
}
return -1, false, nil
}

func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
// Validate arguments
volumeID := req.GetVolumeId()
Expand Down
185 changes: 170 additions & 15 deletions pkg/gce-pd-csi-driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
testingexec "k8s.io/utils/exec/testing"

csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/mount-utils"
Expand Down Expand Up @@ -109,21 +110,33 @@ func TestNodeGetVolumeStats(t *testing.T) {
Readonly: false,
VolumeCapability: stdVolCap,
}

_, err = ns.NodePublishVolume(context.Background(), req)
if err != nil {
t.Fatalf("Failed to set up test by publishing default vol: %v", err)
}

testCases := []struct {
name string
volumeID string
volumePath string
expectErr bool
name string
volumeID string
volumePath string
expectedResp *csi.NodeGetVolumeStatsResponse
deviceCapacity int
expectErr bool
}{
{
name: "normal",
volumeID: defaultVolumeID,
volumePath: targetPath,
name: "normal",
volumeID: defaultVolumeID,
volumePath: targetPath,
deviceCapacity: 300 * 1024 * 1024 * 1024, // 300 GB
expectedResp: &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Unit: csi.VolumeUsage_BYTES,
Total: 300 * 1024 * 1024 * 1024, // 300 GB,
},
},
},
},
{
name: "no vol id",
Expand All @@ -145,18 +158,38 @@ func TestNodeGetVolumeStats(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actionList := []testingexec.FakeCommandAction{
makeFakeCmd(
&testingexec.FakeCmd{
CombinedOutputScript: []testingexec.FakeAction{
func() ([]byte, []byte, error) {
return []byte(fmt.Sprintf("%d", tc.deviceCapacity)), nil, nil
},
},
},
"blockdev",
strings.Split("--getsize64 /dev/disk/fake-path", " ")...,
),
}

mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList})
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
ns := gceDriver.ns

req := &csi.NodeGetVolumeStatsRequest{
VolumeId: tc.volumeID,
VolumePath: tc.volumePath,
}
_, err := ns.NodeGetVolumeStats(context.Background(), req)
resp, err := ns.NodeGetVolumeStats(context.Background(), req)
if err != nil && !tc.expectErr {
t.Fatalf("Got unexpected err: %v", err)
}
if err == nil && tc.expectErr {
t.Fatal("Did not get error but expected one")
}
if diff := cmp.Diff(tc.expectedResp, resp); diff != "" {
t.Errorf("NodeGetVolumeStats(%s): -want, +got \n%s", req, diff)
}
})
}
}
Expand Down Expand Up @@ -394,13 +427,17 @@ func TestNodeStageVolume(t *testing.T) {
stagingPath := filepath.Join(tempDir, defaultStagingPath)

testCases := []struct {
name string
req *csi.NodeStageVolumeRequest
deviceSize int
blockExtSize int
readonlyBit string
expResize bool
expErrCode codes.Code
name string
req *csi.NodeStageVolumeRequest
deviceSize int
blockExtSize int
readonlyBit string
expResize bool
expReadAheadUpdate bool
expReadAheadKB string
readAheadSectors string
sectorSizeInBytes int
expErrCode codes.Code
}{
{
name: "Valid request, no resize because block and filesystem sizes match",
Expand Down Expand Up @@ -450,6 +487,54 @@ func TestNodeStageVolume(t *testing.T) {
readonlyBit: "0",
expResize: false,
},
{
name: "Valid request, update readahead",
req: &csi.NodeStageVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: stagingPath,
VolumeCapability: &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
MountFlags: []string{"read_ahead_kb=4096"},
},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
},
},
deviceSize: 5,
blockExtSize: 1,
readonlyBit: "0",
expResize: true,
expReadAheadUpdate: true,
readAheadSectors: "8192",
sectorSizeInBytes: 512,
},
{
name: "Valid request, update readahead (different sectorsize)",
req: &csi.NodeStageVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: stagingPath,
VolumeCapability: &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
MountFlags: []string{"read_ahead_kb=4096"},
},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
},
},
deviceSize: 5,
blockExtSize: 1,
readonlyBit: "0",
expResize: true,
expReadAheadUpdate: true,
readAheadSectors: "4194304",
sectorSizeInBytes: 1,
},
{
name: "Invalid request (Bad Access Mode)",
req: &csi.NodeStageVolumeRequest{
Expand Down Expand Up @@ -493,10 +578,47 @@ func TestNodeStageVolume(t *testing.T) {
},
expErrCode: codes.InvalidArgument,
},
{
name: "Invalid request (Invalid read_ahead_kb)",
req: &csi.NodeStageVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: stagingPath,
VolumeCapability: &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
MountFlags: []string{"read_ahead_kb=not_a_number"},
},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
},
},
expErrCode: codes.InvalidArgument,
},
{
name: "Invalid request (negative read_ahead_kb)",
req: &csi.NodeStageVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: stagingPath,
VolumeCapability: &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
MountFlags: []string{"read_ahead_kb=-4096"},
},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
},
},
expErrCode: codes.InvalidArgument,
},
}
for _, tc := range testCases {
t.Logf("Test case: %s", tc.name)
resizeCalled := false
readAheadUpdateCalled := false
actionList := []testingexec.FakeCommandAction{
makeFakeCmd(
&testingexec.FakeCmd{
Expand Down Expand Up @@ -593,6 +715,33 @@ func TestNodeStageVolume(t *testing.T) {
),
}...)
}
if tc.expReadAheadUpdate {
actionList = append(actionList, []testingexec.FakeCommandAction{
makeFakeCmd(
&testingexec.FakeCmd{
CombinedOutputScript: []testingexec.FakeAction{
func() ([]byte, []byte, error) {
return []byte(fmt.Sprintf("%d", tc.sectorSizeInBytes)), nil, nil
},
},
},
"blockdev",
[]string{"--getss", "/dev/disk/fake-path"}...,
),
makeFakeCmd(
&testingexec.FakeCmd{
CombinedOutputScript: []testingexec.FakeAction{
func() (_ []byte, args []byte, _ error) {
readAheadUpdateCalled = true
return []byte{}, nil, nil
},
},
},
"blockdev",
[]string{"--setra", tc.readAheadSectors, "/dev/disk/fake-path"}...,
),
}...)
}
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList})
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
ns := gceDriver.ns
Expand All @@ -616,6 +765,12 @@ func TestNodeStageVolume(t *testing.T) {
if tc.expResize == false && resizeCalled == true {
t.Fatalf("Test called resize, but it was not expected.")
}
if tc.expReadAheadUpdate == true && readAheadUpdateCalled == false {
t.Fatalf("Test did not update read ahead, but it was expected.")
}
if tc.expReadAheadUpdate == false && readAheadUpdateCalled == true {
t.Fatalf("Test updated read ahead, but it was not expected.")
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/gce-pd-csi-driver/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ func collectMountOptions(fsType string, mntFlags []string) []string {
var options []string

for _, opt := range mntFlags {
if readAheadKBMountFlagRegex.FindString(opt) != "" {
// The read_ahead_kb flag is a special flag that isn't
// passed directly as an option to the mount command.
continue
}
options = append(options, opt)
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/gce-pd-csi-driver/utils_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,24 @@ func getBlockSizeBytes(devicePath string, m *mount.SafeFormatAndMount) (int64, e
}
return gotSizeBytes, nil
}

func setReadAheadKB(devicePath string, readAheadKB int64, m *mount.SafeFormatAndMount) error {
output, err := m.Exec.Command("blockdev", "--getss", devicePath).CombinedOutput()
if err != nil {
return fmt.Errorf("error when reading sector size at path %s: output: %s, err: %w", devicePath, string(output), err)
}
strOut := strings.TrimSpace(string(output))
sectorSizeBytes, err := strconv.ParseInt(strOut, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse %q into an int size", strOut)
}
readAheadInSectors := readAheadKB * 1024 / sectorSizeBytes
readAheadInSectorsStr := strconv.FormatInt(readAheadInSectors, 10)
// Empirical testing indicates that the actual read_ahead_kb size that is set is rounded to the
// nearest 4KB.
output, err = m.Exec.Command("blockdev", "--setra", readAheadInSectorsStr, devicePath).CombinedOutput()
if err != nil {
return fmt.Errorf("error when setting readahead at path %s: output: %s, err: %w", devicePath, string(output), err)
}
return nil
}
Loading