Skip to content

Fix chunksize bug for large cache size in data cache #1954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deploy/kubernetes/base/controller/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ spec:
- "--endpoint=unix:/csi/csi.sock"
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
- --enable-data-cache
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: "/etc/cloud-sa/cloud-sa.json"
Expand Down
6 changes: 4 additions & 2 deletions pkg/common/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,19 +280,21 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string]
if !enableDataCache {
return p, d, fmt.Errorf("data caching enabled: %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize)
}
// TODO: need to parse or validate the string

paramDataCacheSize, err := ConvertGiStringToInt64(v)
if err != nil {
return p, d, fmt.Errorf("parameters contain invalid dataCacheSize parameter: %w", err)
}
if err := ValidateNonNegativeInt(paramDataCacheSize); err != nil {
return p, d, fmt.Errorf("parameters contains invalid option: %s: %w", ParameterKeyDataCacheSize, err)
}
d.DataCacheSize = strconv.FormatInt(paramDataCacheSize, 10)
case ParameterKeyDataCacheMode:
if !enableDataCache {
return p, d, fmt.Errorf("data caching enabled %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize)
}
if err := ValidateDataCacheMode(v); err != nil {
return p, d, fmt.Errorf("parameters contains invalid option: %w", err)
return p, d, fmt.Errorf("parameters contains invalid option: %s: %w", ParameterKeyDataCacheMode, err)
}
d.DataCacheMode = v
case ParameterKeyResourceTags:
Expand Down
7 changes: 7 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,13 @@ func ValidateDataCacheMode(s string) error {
return fmt.Errorf("invalid data-cache-mode %s. Only \"writeback\" and \"writethrough\" is a valid input", s)
}

func ValidateNonNegativeInt(n int64) error {
if n <= 0 {
return fmt.Errorf("Input should be set to > 0, got %d", n)
}
return nil
}

// NewLimiter returns a token bucket based request rate limiter after initializing
// the passed values for limit, burst (or token bucket) size. If opted for emptyBucket
// all initial tokens are reserved for the first burst.
Expand Down
36 changes: 36 additions & 0 deletions pkg/common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,42 @@ func TestValidateDataCacheMode(t *testing.T) {

}

func TestValidateNonNegativeInt(t *testing.T) {
testCases := []struct {
name string
cacheSize int64
expectError bool
}{
{
name: "valid input - positive cache size",
cacheSize: 100000,
},
{
name: "invalid input - cachesize 0",
cacheSize: 0,
expectError: true,
},
{
name: "invalid input - negative cache size",
cacheSize: -100,
expectError: true,
},
}

for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
err := ValidateNonNegativeInt(tc.cacheSize)
if err != nil && !tc.expectError {
t.Errorf("Got error %v validate data cache mode %d; expect no error", err, tc.cacheSize)
}

if err == nil && tc.expectError {
t.Errorf("Got no error validate data cache mode %d; expect an error", tc.cacheSize)
}
}

}

func TestParseZoneFromURI(t *testing.T) {
testcases := []struct {
name string
Expand Down
90 changes: 70 additions & 20 deletions pkg/gce-pd-csi-driver/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gceGCEDriver
import (
"context"
"fmt"
"math"
"regexp"
"strconv"
"strings"
Expand All @@ -16,10 +17,18 @@ import (
)

const (
cacheSuffix = "csi-fast"
mainLvSuffix = "csi-main"
raidedLocalSsdName = "csi-driver-data-cache"
raidMode = "0"
cacheSuffix = "csi-fast"
mainLvSuffix = "csi-main"
raidedLocalSsdName = "csi-driver-data-cache"
raidMode = "0"
maxAllowedChunks int64 = 1000000 // This is the max allowed chunks for LVM
GiB float64 = 1024 * 1024 * 1024
KiB float64 = 1024
)

var (
maxChunkSize float64 = 1 * GiB // Max allowed chunk size as per LVM documentation
minChunkSize float64 = 160 * KiB // This is randomly selected, we need a multiple of 32KiB, the default size would be too small for caching https://man7.org/linux/man-pages/man8/lvcreate.8.html (--chunksize)
)

func fetchRAIDedLocalSsdPath() (string, error) {
Expand Down Expand Up @@ -84,7 +93,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
vgNameForPv := strings.TrimSpace(infoSlice[(len(infoSlice) - 1)])
klog.V(4).Infof("Physical volume is part of Volume group: %v", vgNameForPv)
if vgNameForPv == volumeGroupName {
klog.V(4).Infof("Physical Volume(PV) already exists in the Volume Group")
klog.V(4).Infof("Physical Volume(PV) already exists in the Volume Group %v", volumeGroupName)
} else if vgNameForPv != "VG" && vgNameForPv != "" {

info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgchange", []string{"-an", vgNameForPv}...)
Expand Down Expand Up @@ -159,21 +168,30 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
// Validate that cache is setup for required size
klog.V(4).Infof("Assuming valid data cache size and mode, resizing cache is not supported")
} else {
fastCacheSize := req.GetPublishContext()[common.ContextDataCacheSize]
chunkSize := "960" // Cannot use default chunk size(64KiB) as it errors on maxChunksAllowed. Unit - KiB
args = []string{
"--yes",
"-n",
cacheLvName,
"-L",
// ConvertGiStringToInt64 converts the input size to GiB so default to "g" for cache size - LVM g|G is GiB.
fastCacheSize + "g",
volumeGroupName,
raidedLocalSsdPath,
}
info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvcreate", args...)
cacheSize := req.GetPublishContext()[common.ContextDataCacheSize]
chunkSize, err := fetchChunkSizeKiB(cacheSize)
if err != nil {
return mainDevicePath, fmt.Errorf("Errored while creating cache %w: %s", err, info)
klog.Errorf("Errored to fetch cache size, verify the data-cache-size is valid: got %v, error: %q", cacheSize, err)
return mainDevicePath, err
}
// Check if LV exists
info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvs", args...)
lvExists := strings.Contains(string(info), cacheLvName)
if !lvExists {
args = []string{
"--yes",
"-n",
cacheLvName,
"-L",
// ConvertGiStringToInt64 converts the input size to GiB so default to "g" for cache size - LVM g|G is GiB.
cacheSize + "g",
volumeGroupName,
raidedLocalSsdPath,
}
info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvcreate", args...)
if err != nil {
return mainDevicePath, fmt.Errorf("Errored while creating cache %w: %s", err, info)
}
}

// Once caching is setup, link the PD to cache
Expand All @@ -188,7 +206,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
req.GetPublishContext()[common.ContextDataCacheMode],
volumeGroupName + "/" + mainLvName,
"--chunksize",
string(chunkSize),
chunkSize, // default unit is KiB
"--force",
"-y",
}
Expand Down Expand Up @@ -357,11 +375,17 @@ func cleanupCache(volumeId string, nodeId string) error {

volumeGroupName := getVolumeGroupName(nodeId)
if !checkVgExists(volumeGroupName) {
klog.V(4).Infof("Volume group %s not found, no cache clean up needed", volumeGroupName)
// If volume group doesn't exist then there's nothing to uncache
return nil
}
reduceVolumeGroup(volumeGroupName, true)
mainLvName := getLvName(mainLvSuffix, volumeId)
if !checkLvExists(mainLvName) {
klog.V(4).Infof("Logical volume %s not found, assuming caching wasn't setup for the PVC %s or is cleaned up", mainLvName, volumeId)
// If logical volume doesn't exist then there's nothing to uncache
return nil
}
args := []string{
"-an",
"/dev/" + volumeGroupName + "/" + mainLvName,
Expand All @@ -382,6 +406,17 @@ func cleanupCache(volumeId string, nodeId string) error {
return nil
}

func checkLvExists(lvName string) bool {
args := []string{}
info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvscan", args...)
if err != nil {
klog.Errorf("Errored while checking if logical volume exists for %s %v: %s", lvName, err, info)
return false
}
// Check if the required logical volume already exists
return strings.Contains(string(info), lvName)
}

func getVolumeGroupName(nodePath string) string {
nodeSlice := strings.Split(nodePath, "/")
nodeId := nodeSlice[len(nodeSlice)-1]
Expand Down Expand Up @@ -497,3 +532,18 @@ func isCachingSetup(mainLvName string) (error, bool) {
}
return nil, false
}

func fetchChunkSizeKiB(cacheSize string) (string, error) {
var chunkSize float64

cacheSizeInt, err := common.ConvertGiStringToInt64(cacheSize)
if err != nil {
return "0", err
}
// Chunksize should be divisible by 32Kib so we need (chunksize/32*1024)*32*1024
chunkSize = (float64(cacheSizeInt) * GiB) / float64(maxAllowedChunks)
chunkSize = math.Round(chunkSize/(32*KiB)) * (32 * KiB)
chunkSize = math.Min(math.Max(chunkSize, minChunkSize), maxChunkSize) / KiB
// default chunk size unit KiB
return strconv.FormatInt(int64(chunkSize), 10) + "KiB", nil
}
57 changes: 57 additions & 0 deletions pkg/gce-pd-csi-driver/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gceGCEDriver

import (
"testing"
)

func TestFetchChunkSizeKiB(t *testing.T) {
testCases := []struct {
name string
cacheSize string
expChunkSize string
expErr bool
}{
{
name: "chunk size is in the allowed range",
cacheSize: "500Gi",
expChunkSize: "512KiB", //range defined in fetchChunkSizeKiB
},
{
name: "chunk size is set to the range ceil",
cacheSize: "30000000Gi",
expChunkSize: "1048576KiB", //range defined in fetchChunkSizeKiB - max 1GiB
},
{
name: "chunk size is set to the allowed range floor",
cacheSize: "10Gi",
expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB
},
{
name: "cacheSize set to KiB also sets the chunk size to range floor",
cacheSize: "100Ki",
expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB
},
{
name: "invalid cacheSize",
cacheSize: "fdfsdKi",
expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB
expErr: true,
},
// cacheSize is validated in storage class parameter so assuming invalid cacheSize (like negative, 0) would not be passed to the function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does cacheSize in StorageClass has the >=0 check? In the past there were bugs in Hyperdisk SC parameters that actually take in < 0 params. Though an error will be returned from GCE.

}

for _, tc := range testCases {
chunkSize, err := fetchChunkSizeKiB(tc.cacheSize)
if err != nil {
if !tc.expErr {
t.Errorf("Errored %s", err)
}
continue
}
if chunkSize != tc.expChunkSize {
t.Errorf("Got %s want %s", chunkSize, tc.expChunkSize)
}

}

}