Skip to content

Commit ff63c7a

Browse files
committed
Add support for read_ahead_kb mount flag
1 parent f40f750 commit ff63c7a

File tree

8 files changed

+340
-20
lines changed

8 files changed

+340
-20
lines changed

pkg/gce-pd-csi-driver/node.go

+53
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ import (
1919
"fmt"
2020
"os"
2121
"path/filepath"
22+
"regexp"
2223
"runtime"
24+
"strconv"
2325
"time"
2426

2527
"google.golang.org/grpc/codes"
@@ -72,6 +74,12 @@ const (
7274
defaultLinuxFsType = "ext4"
7375
defaultWindowsFsType = "ntfs"
7476
fsTypeExt3 = "ext3"
77+
78+
readAheadKBMountFlagRegexPattern = "^read_ahead_kb=(.+)$"
79+
)
80+
81+
var (
82+
readAheadKBMountFlagRegex = regexp.MustCompile(readAheadKBMountFlagRegexPattern)
7583
)
7684

7785
func getDefaultFsType() string {
@@ -318,12 +326,19 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
318326
// Part 3: Mount device to stagingTargetPath
319327
fstype := getDefaultFsType()
320328

329+
shouldUpdateReadAhead := false
330+
var readAheadKB int64
321331
options := []string{}
322332
if mnt := volumeCapability.GetMount(); mnt != nil {
323333
if mnt.FsType != "" {
324334
fstype = mnt.FsType
325335
}
326336
options = collectMountOptions(fstype, mnt.MountFlags)
337+
338+
readAheadKB, shouldUpdateReadAhead, err = extractReadAheadKBMountFlag(mnt.MountFlags)
339+
if err != nil {
340+
return nil, status.Errorf(codes.InvalidArgument, "failure parsing mount flags: %v", err.Error())
341+
}
327342
} else if blk := volumeCapability.GetBlock(); blk != nil {
328343
// Noop for Block NodeStageVolume
329344
klog.V(4).Infof("NodeStageVolume succeeded on %v to %s, capability is block so this is a no-op", volumeID, stagingTargetPath)
@@ -368,10 +383,48 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
368383
}
369384
}
370385

386+
// Part 5: Update read_ahead
387+
if shouldUpdateReadAhead {
388+
if err := ns.updateReadAhead(devicePath, readAheadKB); err != nil {
389+
return nil, status.Errorf(codes.Internal, "failure updating readahead for %s to %dKB: %v", devicePath, readAheadKB, err.Error())
390+
}
391+
}
392+
371393
klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath)
372394
return &csi.NodeStageVolumeResponse{}, nil
373395
}
374396

397+
func (ns *GCENodeServer) updateReadAhead(devicePath string, readAheadKB int64) error {
398+
isBlock, err := ns.VolumeStatter.IsBlockDevice(devicePath)
399+
if err != nil {
400+
return fmt.Errorf("failed to determine whether %s is a block device: %v", devicePath, err)
401+
}
402+
if !isBlock {
403+
return nil
404+
}
405+
406+
if err := setReadAheadKB(devicePath, readAheadKB, ns.Mounter); err != nil {
407+
return fmt.Errorf("failed to set readahead: %v", err)
408+
}
409+
410+
return nil
411+
}
412+
413+
func extractReadAheadKBMountFlag(mountFlags []string) (int64, bool, error) {
414+
for _, mountFlag := range mountFlags {
415+
if readAheadKB := readAheadKBMountFlagRegex.FindStringSubmatch(mountFlag); len(readAheadKB) == 2 {
416+
// There is only one matching pattern in readAheadKBMountFlagRegex
417+
// If found, it will be at index 1
418+
readAheadKBInt, err := strconv.ParseInt(readAheadKB[1], 10, 0)
419+
if err != nil {
420+
return -1, false, fmt.Errorf("invalid read_ahead_kb mount flag %q: %v", mountFlag, err)
421+
}
422+
return readAheadKBInt, true, nil
423+
}
424+
}
425+
return -1, false, nil
426+
}
427+
375428
func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
376429
// Validate arguments
377430
volumeID := req.GetVolumeId()

pkg/gce-pd-csi-driver/node_test.go

+152-15
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
testingexec "k8s.io/utils/exec/testing"
2929

3030
csi "github.com/container-storage-interface/spec/lib/go/csi"
31+
"github.com/google/go-cmp/cmp"
3132
"google.golang.org/grpc/codes"
3233
"google.golang.org/grpc/status"
3334
"k8s.io/mount-utils"
@@ -109,21 +110,33 @@ func TestNodeGetVolumeStats(t *testing.T) {
109110
Readonly: false,
110111
VolumeCapability: stdVolCap,
111112
}
113+
112114
_, err = ns.NodePublishVolume(context.Background(), req)
113115
if err != nil {
114116
t.Fatalf("Failed to set up test by publishing default vol: %v", err)
115117
}
116118

117119
testCases := []struct {
118-
name string
119-
volumeID string
120-
volumePath string
121-
expectErr bool
120+
name string
121+
volumeID string
122+
volumePath string
123+
expectedResp *csi.NodeGetVolumeStatsResponse
124+
deviceCapacity int
125+
expectErr bool
122126
}{
123127
{
124-
name: "normal",
125-
volumeID: defaultVolumeID,
126-
volumePath: targetPath,
128+
name: "normal",
129+
volumeID: defaultVolumeID,
130+
volumePath: targetPath,
131+
deviceCapacity: 300 * 1024 * 1024 * 1024, // 300 GB
132+
expectedResp: &csi.NodeGetVolumeStatsResponse{
133+
Usage: []*csi.VolumeUsage{
134+
{
135+
Unit: csi.VolumeUsage_BYTES,
136+
Total: 300 * 1024 * 1024 * 1024, // 300 GB,
137+
},
138+
},
139+
},
127140
},
128141
{
129142
name: "no vol id",
@@ -145,18 +158,38 @@ func TestNodeGetVolumeStats(t *testing.T) {
145158

146159
for _, tc := range testCases {
147160
t.Run(tc.name, func(t *testing.T) {
161+
actionList := []testingexec.FakeCommandAction{
162+
makeFakeCmd(
163+
&testingexec.FakeCmd{
164+
CombinedOutputScript: []testingexec.FakeAction{
165+
func() ([]byte, []byte, error) {
166+
return []byte(fmt.Sprintf("%d", tc.deviceCapacity)), nil, nil
167+
},
168+
},
169+
},
170+
"blockdev",
171+
strings.Split("--getsize64 /dev/disk/fake-path", " ")...,
172+
),
173+
}
174+
175+
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList})
176+
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
177+
ns := gceDriver.ns
148178

149179
req := &csi.NodeGetVolumeStatsRequest{
150180
VolumeId: tc.volumeID,
151181
VolumePath: tc.volumePath,
152182
}
153-
_, err := ns.NodeGetVolumeStats(context.Background(), req)
183+
resp, err := ns.NodeGetVolumeStats(context.Background(), req)
154184
if err != nil && !tc.expectErr {
155185
t.Fatalf("Got unexpected err: %v", err)
156186
}
157187
if err == nil && tc.expectErr {
158188
t.Fatal("Did not get error but expected one")
159189
}
190+
if diff := cmp.Diff(tc.expectedResp, resp); diff != "" {
191+
t.Errorf("NodeGetVolumeStats(%s): -want, +got \n%s", req, diff)
192+
}
160193
})
161194
}
162195
}
@@ -394,13 +427,17 @@ func TestNodeStageVolume(t *testing.T) {
394427
stagingPath := filepath.Join(tempDir, defaultStagingPath)
395428

396429
testCases := []struct {
397-
name string
398-
req *csi.NodeStageVolumeRequest
399-
deviceSize int
400-
blockExtSize int
401-
readonlyBit string
402-
expResize bool
403-
expErrCode codes.Code
430+
name string
431+
req *csi.NodeStageVolumeRequest
432+
deviceSize int
433+
blockExtSize int
434+
readonlyBit string
435+
expResize bool
436+
expReadAheadUpdate bool
437+
expReadAheadKB string
438+
readAheadSectors string
439+
sectorSizeInBytes int
440+
expErrCode codes.Code
404441
}{
405442
{
406443
name: "Valid request, no resize because block and filesystem sizes match",
@@ -450,6 +487,54 @@ func TestNodeStageVolume(t *testing.T) {
450487
readonlyBit: "0",
451488
expResize: false,
452489
},
490+
{
491+
name: "Valid request, update readahead",
492+
req: &csi.NodeStageVolumeRequest{
493+
VolumeId: volumeID,
494+
StagingTargetPath: stagingPath,
495+
VolumeCapability: &csi.VolumeCapability{
496+
AccessType: &csi.VolumeCapability_Mount{
497+
Mount: &csi.VolumeCapability_MountVolume{
498+
MountFlags: []string{"read_ahead_kb=4096"},
499+
},
500+
},
501+
AccessMode: &csi.VolumeCapability_AccessMode{
502+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
503+
},
504+
},
505+
},
506+
deviceSize: 5,
507+
blockExtSize: 1,
508+
readonlyBit: "0",
509+
expResize: true,
510+
expReadAheadUpdate: true,
511+
readAheadSectors: "8192",
512+
sectorSizeInBytes: 512,
513+
},
514+
{
515+
name: "Valid request, update readahead (different sectorsize)",
516+
req: &csi.NodeStageVolumeRequest{
517+
VolumeId: volumeID,
518+
StagingTargetPath: stagingPath,
519+
VolumeCapability: &csi.VolumeCapability{
520+
AccessType: &csi.VolumeCapability_Mount{
521+
Mount: &csi.VolumeCapability_MountVolume{
522+
MountFlags: []string{"read_ahead_kb=4096"},
523+
},
524+
},
525+
AccessMode: &csi.VolumeCapability_AccessMode{
526+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
527+
},
528+
},
529+
},
530+
deviceSize: 5,
531+
blockExtSize: 1,
532+
readonlyBit: "0",
533+
expResize: true,
534+
expReadAheadUpdate: true,
535+
readAheadSectors: "4194304",
536+
sectorSizeInBytes: 1,
537+
},
453538
{
454539
name: "Invalid request (Bad Access Mode)",
455540
req: &csi.NodeStageVolumeRequest{
@@ -493,10 +578,29 @@ func TestNodeStageVolume(t *testing.T) {
493578
},
494579
expErrCode: codes.InvalidArgument,
495580
},
581+
{
582+
name: "Invalid request (Invalid read_ahead_kb)",
583+
req: &csi.NodeStageVolumeRequest{
584+
VolumeId: volumeID,
585+
StagingTargetPath: stagingPath,
586+
VolumeCapability: &csi.VolumeCapability{
587+
AccessType: &csi.VolumeCapability_Mount{
588+
Mount: &csi.VolumeCapability_MountVolume{
589+
MountFlags: []string{"read_ahead_kb=not_a_number"},
590+
},
591+
},
592+
AccessMode: &csi.VolumeCapability_AccessMode{
593+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
594+
},
595+
},
596+
},
597+
expErrCode: codes.InvalidArgument,
598+
},
496599
}
497600
for _, tc := range testCases {
498601
t.Logf("Test case: %s", tc.name)
499602
resizeCalled := false
603+
readAheadUpdateCalled := false
500604
actionList := []testingexec.FakeCommandAction{
501605
makeFakeCmd(
502606
&testingexec.FakeCmd{
@@ -593,6 +697,33 @@ func TestNodeStageVolume(t *testing.T) {
593697
),
594698
}...)
595699
}
700+
if tc.expReadAheadUpdate {
701+
actionList = append(actionList, []testingexec.FakeCommandAction{
702+
makeFakeCmd(
703+
&testingexec.FakeCmd{
704+
CombinedOutputScript: []testingexec.FakeAction{
705+
func() ([]byte, []byte, error) {
706+
return []byte(fmt.Sprintf("%d", tc.sectorSizeInBytes)), nil, nil
707+
},
708+
},
709+
},
710+
"blockdev",
711+
[]string{"--getss", "/dev/disk/fake-path"}...,
712+
),
713+
makeFakeCmd(
714+
&testingexec.FakeCmd{
715+
CombinedOutputScript: []testingexec.FakeAction{
716+
func() (_ []byte, args []byte, _ error) {
717+
readAheadUpdateCalled = true
718+
return []byte{}, nil, nil
719+
},
720+
},
721+
},
722+
"blockdev",
723+
[]string{"--setra", tc.readAheadSectors, "/dev/disk/fake-path"}...,
724+
),
725+
}...)
726+
}
596727
mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList})
597728
gceDriver := getTestGCEDriverWithCustomMounter(t, mounter)
598729
ns := gceDriver.ns
@@ -616,6 +747,12 @@ func TestNodeStageVolume(t *testing.T) {
616747
if tc.expResize == false && resizeCalled == true {
617748
t.Fatalf("Test called resize, but it was not expected.")
618749
}
750+
if tc.expReadAheadUpdate == true && readAheadUpdateCalled == false {
751+
t.Fatalf("Test did not update read ahead, but it was expected.")
752+
}
753+
if tc.expReadAheadUpdate == false && readAheadUpdateCalled == true {
754+
t.Fatalf("Test updated read ahead, but it was not expected.")
755+
}
619756
}
620757
}
621758

pkg/gce-pd-csi-driver/utils.go

+5
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ func collectMountOptions(fsType string, mntFlags []string) []string {
257257
var options []string
258258

259259
for _, opt := range mntFlags {
260+
if readAheadKBMountFlagRegex.FindString(opt) != "" {
261+
// The read_ahead_kb flag is a special flag that isn't
262+
// passed directly as an option to the mount command.
263+
continue
264+
}
260265
options = append(options, opt)
261266
}
262267

pkg/gce-pd-csi-driver/utils_linux.go

+19
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,22 @@ func getBlockSizeBytes(devicePath string, m *mount.SafeFormatAndMount) (int64, e
106106
}
107107
return gotSizeBytes, nil
108108
}
109+
110+
func setReadAheadKB(devicePath string, readAheadKB int64, m *mount.SafeFormatAndMount) error {
111+
output, err := m.Exec.Command("blockdev", "--getss", devicePath).CombinedOutput()
112+
if err != nil {
113+
return fmt.Errorf("error when reading sector size at path %s: output: %s, err: %w", devicePath, string(output), err)
114+
}
115+
strOut := strings.TrimSpace(string(output))
116+
sectorSizeBytes, err := strconv.ParseInt(strOut, 10, 64)
117+
if err != nil {
118+
return fmt.Errorf("failed to parse %q into an int size", strOut)
119+
}
120+
readAheadInSectors := readAheadKB * 1024 / sectorSizeBytes
121+
readAheadInSectorsStr := strconv.FormatInt(readAheadInSectors, 10)
122+
output, err = m.Exec.Command("blockdev", "--setra", readAheadInSectorsStr, devicePath).CombinedOutput()
123+
if err != nil {
124+
return fmt.Errorf("error when setting readahead at path %s: output: %s, err: %w", devicePath, string(output), err)
125+
}
126+
return nil
127+
}

pkg/gce-pd-csi-driver/utils_windows.go

+5
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,8 @@ func getBlockSizeBytes(devicePath string, m *mount.SafeFormatAndMount) (int64, e
105105
}
106106
return proxy.GetDiskTotalBytes(devicePath)
107107
}
108+
109+
func setReadAheadKB(devicePath string, readAheadKB int64, m *mount.SafeFormatAndMount) error {
110+
// This is a no-op on windows.
111+
return nil
112+
}

0 commit comments

Comments
 (0)