diff --git a/deploy/kubernetes/base/controller/controller.yaml b/deploy/kubernetes/base/controller/controller.yaml index ef2a665bb..beb67405f 100644 --- a/deploy/kubernetes/base/controller/controller.yaml +++ b/deploy/kubernetes/base/controller/controller.yaml @@ -143,6 +143,7 @@ spec: - "--endpoint=unix:/csi/csi.sock" - "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme" - "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml" + - --enable-data-cache env: - name: GOOGLE_APPLICATION_CREDENTIALS value: "/etc/cloud-sa/cloud-sa.json" diff --git a/pkg/common/parameters.go b/pkg/common/parameters.go index 0572a0515..18cca76fb 100644 --- a/pkg/common/parameters.go +++ b/pkg/common/parameters.go @@ -280,19 +280,21 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string] if !enableDataCache { return p, d, fmt.Errorf("data caching enabled: %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize) } - // TODO: need to parse or validate the string paramDataCacheSize, err := ConvertGiStringToInt64(v) if err != nil { return p, d, fmt.Errorf("parameters contain invalid dataCacheSize parameter: %w", err) } + if err := ValidateNonNegativeInt(paramDataCacheSize); err != nil { + return p, d, fmt.Errorf("parameters contains invalid option: %s: %w", ParameterKeyDataCacheSize, err) + } d.DataCacheSize = strconv.FormatInt(paramDataCacheSize, 10) case ParameterKeyDataCacheMode: if !enableDataCache { return p, d, fmt.Errorf("data caching enabled %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize) } if err := ValidateDataCacheMode(v); err != nil { - return p, d, fmt.Errorf("parameters contains invalid option: %w", err) + return p, d, fmt.Errorf("parameters contains invalid option: %s: %w", ParameterKeyDataCacheMode, err) } d.DataCacheMode = v case ParameterKeyResourceTags: diff --git a/pkg/common/utils.go b/pkg/common/utils.go index 44f736d7f..d10607721 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -713,6 +713,13 @@ func ValidateDataCacheMode(s string) error { return fmt.Errorf("invalid data-cache-mode %s. Only \"writeback\" and \"writethrough\" is a valid input", s) } +func ValidateNonNegativeInt(n int64) error { + if n <= 0 { + return fmt.Errorf("Input should be set to > 0, got %d", n) + } + return nil +} + // NewLimiter returns a token bucket based request rate limiter after initializing // the passed values for limit, burst (or token bucket) size. If opted for emptyBucket // all initial tokens are reserved for the first burst. diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go index 9f9dd58b3..5917487c1 100644 --- a/pkg/common/utils_test.go +++ b/pkg/common/utils_test.go @@ -1823,6 +1823,42 @@ func TestValidateDataCacheMode(t *testing.T) { } +func TestValidateNonNegativeInt(t *testing.T) { + testCases := []struct { + name string + cacheSize int64 + expectError bool + }{ + { + name: "valid input - positive cache size", + cacheSize: 100000, + }, + { + name: "invalid input - cachesize 0", + cacheSize: 0, + expectError: true, + }, + { + name: "invalid input - negative cache size", + cacheSize: -100, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + err := ValidateNonNegativeInt(tc.cacheSize) + if err != nil && !tc.expectError { + t.Errorf("Got error %v validate data cache mode %d; expect no error", err, tc.cacheSize) + } + + if err == nil && tc.expectError { + t.Errorf("Got no error validate data cache mode %d; expect an error", tc.cacheSize) + } + } + +} + func TestParseZoneFromURI(t *testing.T) { testcases := []struct { name string diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go index 91be2eb95..4e61d30d1 100644 --- a/pkg/gce-pd-csi-driver/cache.go +++ b/pkg/gce-pd-csi-driver/cache.go @@ -3,6 +3,7 @@ package gceGCEDriver import ( "context" "fmt" + "math" "regexp" "strconv" "strings" @@ -16,10 +17,18 @@ import ( ) const ( - cacheSuffix = "csi-fast" - mainLvSuffix = "csi-main" - raidedLocalSsdName = "csi-driver-data-cache" - raidMode = "0" + cacheSuffix = "csi-fast" + mainLvSuffix = "csi-main" + raidedLocalSsdName = "csi-driver-data-cache" + raidMode = "0" + maxAllowedChunks int64 = 1000000 // This is the max allowed chunks for LVM + GiB float64 = 1024 * 1024 * 1024 + KiB float64 = 1024 +) + +var ( + maxChunkSize float64 = 1 * GiB // Max allowed chunk size as per LVM documentation + minChunkSize float64 = 160 * KiB // This is randomly selected, we need a multiple of 32KiB, the default size would be too small for caching https://man7.org/linux/man-pages/man8/lvcreate.8.html (--chunksize) ) func fetchRAIDedLocalSsdPath() (string, error) { @@ -84,7 +93,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str vgNameForPv := strings.TrimSpace(infoSlice[(len(infoSlice) - 1)]) klog.V(4).Infof("Physical volume is part of Volume group: %v", vgNameForPv) if vgNameForPv == volumeGroupName { - klog.V(4).Infof("Physical Volume(PV) already exists in the Volume Group") + klog.V(4).Infof("Physical Volume(PV) already exists in the Volume Group %v", volumeGroupName) } else if vgNameForPv != "VG" && vgNameForPv != "" { info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgchange", []string{"-an", vgNameForPv}...) @@ -159,21 +168,30 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str // Validate that cache is setup for required size klog.V(4).Infof("Assuming valid data cache size and mode, resizing cache is not supported") } else { - fastCacheSize := req.GetPublishContext()[common.ContextDataCacheSize] - chunkSize := "960" // Cannot use default chunk size(64KiB) as it errors on maxChunksAllowed. Unit - KiB - args = []string{ - "--yes", - "-n", - cacheLvName, - "-L", - // ConvertGiStringToInt64 converts the input size to GiB so default to "g" for cache size - LVM g|G is GiB. - fastCacheSize + "g", - volumeGroupName, - raidedLocalSsdPath, - } - info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvcreate", args...) + cacheSize := req.GetPublishContext()[common.ContextDataCacheSize] + chunkSize, err := fetchChunkSizeKiB(cacheSize) if err != nil { - return mainDevicePath, fmt.Errorf("Errored while creating cache %w: %s", err, info) + klog.Errorf("Errored to fetch cache size, verify the data-cache-size is valid: got %v, error: %q", cacheSize, err) + return mainDevicePath, err + } + // Check if LV exists + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvs", args...) + lvExists := strings.Contains(string(info), cacheLvName) + if !lvExists { + args = []string{ + "--yes", + "-n", + cacheLvName, + "-L", + // ConvertGiStringToInt64 converts the input size to GiB so default to "g" for cache size - LVM g|G is GiB. + cacheSize + "g", + volumeGroupName, + raidedLocalSsdPath, + } + info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvcreate", args...) + if err != nil { + return mainDevicePath, fmt.Errorf("Errored while creating cache %w: %s", err, info) + } } // Once caching is setup, link the PD to cache @@ -188,7 +206,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str req.GetPublishContext()[common.ContextDataCacheMode], volumeGroupName + "/" + mainLvName, "--chunksize", - string(chunkSize), + chunkSize, // default unit is KiB "--force", "-y", } @@ -357,11 +375,17 @@ func cleanupCache(volumeId string, nodeId string) error { volumeGroupName := getVolumeGroupName(nodeId) if !checkVgExists(volumeGroupName) { + klog.V(4).Infof("Volume group %s not found, no cache clean up needed", volumeGroupName) // If volume group doesn't exist then there's nothing to uncache return nil } reduceVolumeGroup(volumeGroupName, true) mainLvName := getLvName(mainLvSuffix, volumeId) + if !checkLvExists(mainLvName) { + klog.V(4).Infof("Logical volume %s not found, assuming caching wasn't setup for the PVC %s or is cleaned up", mainLvName, volumeId) + // If logical volume doesn't exist then there's nothing to uncache + return nil + } args := []string{ "-an", "/dev/" + volumeGroupName + "/" + mainLvName, @@ -382,6 +406,17 @@ func cleanupCache(volumeId string, nodeId string) error { return nil } +func checkLvExists(lvName string) bool { + args := []string{} + info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvscan", args...) + if err != nil { + klog.Errorf("Errored while checking if logical volume exists for %s %v: %s", lvName, err, info) + return false + } + // Check if the required logical volume already exists + return strings.Contains(string(info), lvName) +} + func getVolumeGroupName(nodePath string) string { nodeSlice := strings.Split(nodePath, "/") nodeId := nodeSlice[len(nodeSlice)-1] @@ -497,3 +532,18 @@ func isCachingSetup(mainLvName string) (error, bool) { } return nil, false } + +func fetchChunkSizeKiB(cacheSize string) (string, error) { + var chunkSize float64 + + cacheSizeInt, err := common.ConvertGiStringToInt64(cacheSize) + if err != nil { + return "0", err + } + // Chunksize should be divisible by 32Kib so we need (chunksize/32*1024)*32*1024 + chunkSize = (float64(cacheSizeInt) * GiB) / float64(maxAllowedChunks) + chunkSize = math.Round(chunkSize/(32*KiB)) * (32 * KiB) + chunkSize = math.Min(math.Max(chunkSize, minChunkSize), maxChunkSize) / KiB + // default chunk size unit KiB + return strconv.FormatInt(int64(chunkSize), 10) + "KiB", nil +} diff --git a/pkg/gce-pd-csi-driver/cache_test.go b/pkg/gce-pd-csi-driver/cache_test.go new file mode 100644 index 000000000..918f26b0c --- /dev/null +++ b/pkg/gce-pd-csi-driver/cache_test.go @@ -0,0 +1,57 @@ +package gceGCEDriver + +import ( + "testing" +) + +func TestFetchChunkSizeKiB(t *testing.T) { + testCases := []struct { + name string + cacheSize string + expChunkSize string + expErr bool + }{ + { + name: "chunk size is in the allowed range", + cacheSize: "500Gi", + expChunkSize: "512KiB", //range defined in fetchChunkSizeKiB + }, + { + name: "chunk size is set to the range ceil", + cacheSize: "30000000Gi", + expChunkSize: "1048576KiB", //range defined in fetchChunkSizeKiB - max 1GiB + }, + { + name: "chunk size is set to the allowed range floor", + cacheSize: "10Gi", + expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB + }, + { + name: "cacheSize set to KiB also sets the chunk size to range floor", + cacheSize: "100Ki", + expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB + }, + { + name: "invalid cacheSize", + cacheSize: "fdfsdKi", + expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB + expErr: true, + }, + // cacheSize is validated in storage class parameter so assuming invalid cacheSize (like negative, 0) would not be passed to the function + } + + for _, tc := range testCases { + chunkSize, err := fetchChunkSizeKiB(tc.cacheSize) + if err != nil { + if !tc.expErr { + t.Errorf("Errored %s", err) + } + continue + } + if chunkSize != tc.expChunkSize { + t.Errorf("Got %s want %s", chunkSize, tc.expChunkSize) + } + + } + +}