Skip to content

Commit 58c8eaf

Browse files
authored
Merge pull request #1954 from Sneha-at/data-cache-bugs
Fix chunksize bug for large cache size in data cache
2 parents dba4be5 + e94c263 commit 58c8eaf

File tree

6 files changed

+175
-22
lines changed

6 files changed

+175
-22
lines changed

deploy/kubernetes/base/controller/controller.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ spec:
143143
- "--endpoint=unix:/csi/csi.sock"
144144
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
145145
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
146+
- --enable-data-cache
146147
env:
147148
- name: GOOGLE_APPLICATION_CREDENTIALS
148149
value: "/etc/cloud-sa/cloud-sa.json"

pkg/common/parameters.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -280,19 +280,21 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string]
280280
if !enableDataCache {
281281
return p, d, fmt.Errorf("data caching enabled: %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize)
282282
}
283-
// TODO: need to parse or validate the string
284283

285284
paramDataCacheSize, err := ConvertGiStringToInt64(v)
286285
if err != nil {
287286
return p, d, fmt.Errorf("parameters contain invalid dataCacheSize parameter: %w", err)
288287
}
288+
if err := ValidateNonNegativeInt(paramDataCacheSize); err != nil {
289+
return p, d, fmt.Errorf("parameters contains invalid option: %s: %w", ParameterKeyDataCacheSize, err)
290+
}
289291
d.DataCacheSize = strconv.FormatInt(paramDataCacheSize, 10)
290292
case ParameterKeyDataCacheMode:
291293
if !enableDataCache {
292294
return p, d, fmt.Errorf("data caching enabled %v; parameters contains invalid option %q", enableDataCache, ParameterKeyDataCacheSize)
293295
}
294296
if err := ValidateDataCacheMode(v); err != nil {
295-
return p, d, fmt.Errorf("parameters contains invalid option: %w", err)
297+
return p, d, fmt.Errorf("parameters contains invalid option: %s: %w", ParameterKeyDataCacheMode, err)
296298
}
297299
d.DataCacheMode = v
298300
case ParameterKeyResourceTags:

pkg/common/utils.go

+7
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,13 @@ func ValidateDataCacheMode(s string) error {
713713
return fmt.Errorf("invalid data-cache-mode %s. Only \"writeback\" and \"writethrough\" is a valid input", s)
714714
}
715715

716+
func ValidateNonNegativeInt(n int64) error {
717+
if n <= 0 {
718+
return fmt.Errorf("Input should be set to > 0, got %d", n)
719+
}
720+
return nil
721+
}
722+
716723
// NewLimiter returns a token bucket based request rate limiter after initializing
717724
// the passed values for limit, burst (or token bucket) size. If opted for emptyBucket
718725
// all initial tokens are reserved for the first burst.

pkg/common/utils_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -1823,6 +1823,42 @@ func TestValidateDataCacheMode(t *testing.T) {
18231823

18241824
}
18251825

1826+
func TestValidateNonNegativeInt(t *testing.T) {
1827+
testCases := []struct {
1828+
name string
1829+
cacheSize int64
1830+
expectError bool
1831+
}{
1832+
{
1833+
name: "valid input - positive cache size",
1834+
cacheSize: 100000,
1835+
},
1836+
{
1837+
name: "invalid input - cachesize 0",
1838+
cacheSize: 0,
1839+
expectError: true,
1840+
},
1841+
{
1842+
name: "invalid input - negative cache size",
1843+
cacheSize: -100,
1844+
expectError: true,
1845+
},
1846+
}
1847+
1848+
for _, tc := range testCases {
1849+
t.Logf("test case: %s", tc.name)
1850+
err := ValidateNonNegativeInt(tc.cacheSize)
1851+
if err != nil && !tc.expectError {
1852+
t.Errorf("Got error %v validate data cache mode %d; expect no error", err, tc.cacheSize)
1853+
}
1854+
1855+
if err == nil && tc.expectError {
1856+
t.Errorf("Got no error validate data cache mode %d; expect an error", tc.cacheSize)
1857+
}
1858+
}
1859+
1860+
}
1861+
18261862
func TestParseZoneFromURI(t *testing.T) {
18271863
testcases := []struct {
18281864
name string

pkg/gce-pd-csi-driver/cache.go

+70-20
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gceGCEDriver
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"regexp"
78
"strconv"
89
"strings"
@@ -16,10 +17,18 @@ import (
1617
)
1718

1819
const (
19-
cacheSuffix = "csi-fast"
20-
mainLvSuffix = "csi-main"
21-
raidedLocalSsdName = "csi-driver-data-cache"
22-
raidMode = "0"
20+
cacheSuffix = "csi-fast"
21+
mainLvSuffix = "csi-main"
22+
raidedLocalSsdName = "csi-driver-data-cache"
23+
raidMode = "0"
24+
maxAllowedChunks int64 = 1000000 // This is the max allowed chunks for LVM
25+
GiB float64 = 1024 * 1024 * 1024
26+
KiB float64 = 1024
27+
)
28+
29+
var (
30+
maxChunkSize float64 = 1 * GiB // Max allowed chunk size as per LVM documentation
31+
minChunkSize float64 = 160 * KiB // This is randomly selected, we need a multiple of 32KiB, the default size would be too small for caching https://man7.org/linux/man-pages/man8/lvcreate.8.html (--chunksize)
2332
)
2433

2534
func fetchRAIDedLocalSsdPath() (string, error) {
@@ -84,7 +93,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
8493
vgNameForPv := strings.TrimSpace(infoSlice[(len(infoSlice) - 1)])
8594
klog.V(4).Infof("Physical volume is part of Volume group: %v", vgNameForPv)
8695
if vgNameForPv == volumeGroupName {
87-
klog.V(4).Infof("Physical Volume(PV) already exists in the Volume Group")
96+
klog.V(4).Infof("Physical Volume(PV) already exists in the Volume Group %v", volumeGroupName)
8897
} else if vgNameForPv != "VG" && vgNameForPv != "" {
8998

9099
info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgchange", []string{"-an", vgNameForPv}...)
@@ -159,21 +168,30 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
159168
// Validate that cache is setup for required size
160169
klog.V(4).Infof("Assuming valid data cache size and mode, resizing cache is not supported")
161170
} else {
162-
fastCacheSize := req.GetPublishContext()[common.ContextDataCacheSize]
163-
chunkSize := "960" // Cannot use default chunk size(64KiB) as it errors on maxChunksAllowed. Unit - KiB
164-
args = []string{
165-
"--yes",
166-
"-n",
167-
cacheLvName,
168-
"-L",
169-
// ConvertGiStringToInt64 converts the input size to GiB so default to "g" for cache size - LVM g|G is GiB.
170-
fastCacheSize + "g",
171-
volumeGroupName,
172-
raidedLocalSsdPath,
173-
}
174-
info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvcreate", args...)
171+
cacheSize := req.GetPublishContext()[common.ContextDataCacheSize]
172+
chunkSize, err := fetchChunkSizeKiB(cacheSize)
175173
if err != nil {
176-
return mainDevicePath, fmt.Errorf("Errored while creating cache %w: %s", err, info)
174+
klog.Errorf("Errored to fetch cache size, verify the data-cache-size is valid: got %v, error: %q", cacheSize, err)
175+
return mainDevicePath, err
176+
}
177+
// Check if LV exists
178+
info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvs", args...)
179+
lvExists := strings.Contains(string(info), cacheLvName)
180+
if !lvExists {
181+
args = []string{
182+
"--yes",
183+
"-n",
184+
cacheLvName,
185+
"-L",
186+
// ConvertGiStringToInt64 converts the input size to GiB so default to "g" for cache size - LVM g|G is GiB.
187+
cacheSize + "g",
188+
volumeGroupName,
189+
raidedLocalSsdPath,
190+
}
191+
info, err = common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvcreate", args...)
192+
if err != nil {
193+
return mainDevicePath, fmt.Errorf("Errored while creating cache %w: %s", err, info)
194+
}
177195
}
178196

179197
// Once caching is setup, link the PD to cache
@@ -188,7 +206,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str
188206
req.GetPublishContext()[common.ContextDataCacheMode],
189207
volumeGroupName + "/" + mainLvName,
190208
"--chunksize",
191-
string(chunkSize),
209+
chunkSize, // default unit is KiB
192210
"--force",
193211
"-y",
194212
}
@@ -357,11 +375,17 @@ func cleanupCache(volumeId string, nodeId string) error {
357375

358376
volumeGroupName := getVolumeGroupName(nodeId)
359377
if !checkVgExists(volumeGroupName) {
378+
klog.V(4).Infof("Volume group %s not found, no cache clean up needed", volumeGroupName)
360379
// If volume group doesn't exist then there's nothing to uncache
361380
return nil
362381
}
363382
reduceVolumeGroup(volumeGroupName, true)
364383
mainLvName := getLvName(mainLvSuffix, volumeId)
384+
if !checkLvExists(mainLvName) {
385+
klog.V(4).Infof("Logical volume %s not found, assuming caching wasn't setup for the PVC %s or is cleaned up", mainLvName, volumeId)
386+
// If logical volume doesn't exist then there's nothing to uncache
387+
return nil
388+
}
365389
args := []string{
366390
"-an",
367391
"/dev/" + volumeGroupName + "/" + mainLvName,
@@ -382,6 +406,17 @@ func cleanupCache(volumeId string, nodeId string) error {
382406
return nil
383407
}
384408

409+
func checkLvExists(lvName string) bool {
410+
args := []string{}
411+
info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "lvscan", args...)
412+
if err != nil {
413+
klog.Errorf("Errored while checking if logical volume exists for %s %v: %s", lvName, err, info)
414+
return false
415+
}
416+
// Check if the required logical volume already exists
417+
return strings.Contains(string(info), lvName)
418+
}
419+
385420
func getVolumeGroupName(nodePath string) string {
386421
nodeSlice := strings.Split(nodePath, "/")
387422
nodeId := nodeSlice[len(nodeSlice)-1]
@@ -497,3 +532,18 @@ func isCachingSetup(mainLvName string) (error, bool) {
497532
}
498533
return nil, false
499534
}
535+
536+
func fetchChunkSizeKiB(cacheSize string) (string, error) {
537+
var chunkSize float64
538+
539+
cacheSizeInt, err := common.ConvertGiStringToInt64(cacheSize)
540+
if err != nil {
541+
return "0", err
542+
}
543+
// Chunksize should be divisible by 32Kib so we need (chunksize/32*1024)*32*1024
544+
chunkSize = (float64(cacheSizeInt) * GiB) / float64(maxAllowedChunks)
545+
chunkSize = math.Round(chunkSize/(32*KiB)) * (32 * KiB)
546+
chunkSize = math.Min(math.Max(chunkSize, minChunkSize), maxChunkSize) / KiB
547+
// default chunk size unit KiB
548+
return strconv.FormatInt(int64(chunkSize), 10) + "KiB", nil
549+
}

pkg/gce-pd-csi-driver/cache_test.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package gceGCEDriver
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestFetchChunkSizeKiB(t *testing.T) {
8+
testCases := []struct {
9+
name string
10+
cacheSize string
11+
expChunkSize string
12+
expErr bool
13+
}{
14+
{
15+
name: "chunk size is in the allowed range",
16+
cacheSize: "500Gi",
17+
expChunkSize: "512KiB", //range defined in fetchChunkSizeKiB
18+
},
19+
{
20+
name: "chunk size is set to the range ceil",
21+
cacheSize: "30000000Gi",
22+
expChunkSize: "1048576KiB", //range defined in fetchChunkSizeKiB - max 1GiB
23+
},
24+
{
25+
name: "chunk size is set to the allowed range floor",
26+
cacheSize: "10Gi",
27+
expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB
28+
},
29+
{
30+
name: "cacheSize set to KiB also sets the chunk size to range floor",
31+
cacheSize: "100Ki",
32+
expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB
33+
},
34+
{
35+
name: "invalid cacheSize",
36+
cacheSize: "fdfsdKi",
37+
expChunkSize: "160KiB", //range defined in fetchChunkSizeKiB - min 160 KiB
38+
expErr: true,
39+
},
40+
// cacheSize is validated in storage class parameter so assuming invalid cacheSize (like negative, 0) would not be passed to the function
41+
}
42+
43+
for _, tc := range testCases {
44+
chunkSize, err := fetchChunkSizeKiB(tc.cacheSize)
45+
if err != nil {
46+
if !tc.expErr {
47+
t.Errorf("Errored %s", err)
48+
}
49+
continue
50+
}
51+
if chunkSize != tc.expChunkSize {
52+
t.Errorf("Got %s want %s", chunkSize, tc.expChunkSize)
53+
}
54+
55+
}
56+
57+
}

0 commit comments

Comments
 (0)