Skip to content

Commit 7e5a1e7

Browse files
committed
update RAIDing logic and validation for Datacache
1 parent 84e1b15 commit 7e5a1e7

File tree

8 files changed

+467
-270
lines changed

8 files changed

+467
-270
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 72 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30-
"k8s.io/client-go/kubernetes"
31-
"k8s.io/client-go/rest"
3230
"k8s.io/klog/v2"
3331
"k8s.io/utils/strings/slices"
34-
35-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3632
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3733
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
3834
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
@@ -67,20 +63,17 @@ var (
6763
waitForOpBackoffSteps = flag.Int("wait-op-backoff-steps", 100, "Steps for wait for operation backoff")
6864
waitForOpBackoffCap = flag.Duration("wait-op-backoff-cap", 0, "Cap for wait for operation backoff")
6965

70-
enableDeviceInUseCheck = flag.Bool("enable-device-in-use-check-on-node-unstage", true, "If set to true, block NodeUnstageVolume requests until the specified device is not in use")
71-
deviceInUseTimeout = flag.Duration("device-in-use-timeout", 30*time.Second, "Max time to wait for a device to be unused when attempting to unstage. Exceeding the timeout will cause an unstage request to return success and ignore the device in use check.")
72-
7366
maxProcs = flag.Int("maxprocs", 1, "GOMAXPROCS override")
7467
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
7568
concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released")
7669

77-
maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations")
78-
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")
79-
fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")
80-
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
81-
enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks")
82-
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration")
83-
nodeName = flag.String("node-name", "", "The node this driver is running on")
70+
maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations")
71+
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")
72+
fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")
73+
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
74+
enableControllerDataCacheFlag = flag.Bool("enable-controller-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration")
75+
enableNodeDataCacheFlag = flag.Bool("enable-node-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration")
76+
nodeName = flag.String("node-name", "", "The node this driver is running on")
8477

8578
multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
8679
multiZoneVolumeHandleEnableFlag = flag.Bool("multi-zone-volume-handle-enable", false, "If set to true, the multi-zone volumeHandle feature will be enabled")
@@ -102,9 +95,7 @@ var (
10295
)
10396

10497
const (
105-
driverName = "pd.csi.storage.gke.io"
106-
dataCacheLabel = "datacache-storage-gke-io"
107-
dataCacheLabelValue = "enabled"
98+
driverName = "pd.csi.storage.gke.io"
10899
)
109100

110101
func init() {
@@ -233,7 +224,7 @@ func handle() {
233224
}
234225
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
235226
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
236-
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag)
227+
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableControllerDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig)
237228
} else if *cloudConfigFilePath != "" {
238229
klog.Warningf("controller service is disabled but cloud config given - it has no effect")
239230
}
@@ -251,18 +242,13 @@ func handle() {
251242
if err != nil {
252243
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
253244
}
254-
nsArgs := driver.NodeServerArgs{
255-
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
256-
DeviceInUseTimeout: *deviceInUseTimeout,
257-
EnableDataCache: *enableDataCacheFlag,
258-
}
259-
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
245+
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, *enableNodeDataCacheFlag)
260246
if *maxConcurrentFormatAndMount > 0 {
261247
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
262248
}
263249
}
264250

265-
if *enableDataCacheFlag {
251+
if *enableNodeDataCacheFlag {
266252
if nodeName == nil || *nodeName == "" {
267253
klog.Errorf("Data cache enabled, but --node-name not passed")
268254
}
@@ -350,29 +336,74 @@ func urlFlag(target **url.URL, name string, usage string) {
350336
})
351337
}
352338

339+
func fetchLssdsForRaiding(lssdCount int) ([]string, error) {
340+
allLssds, err := driver.FetchAllLssds()
341+
if err != nil {
342+
return nil, fmt.Errorf("Error listing all LSSDs %v", err)
343+
}
344+
345+
raidedLssds, err := driver.FetchRaidedLssds()
346+
if err != nil {
347+
return nil, fmt.Errorf("Error listing RAIDed LSSDs %v", err)
348+
}
349+
350+
unRaidedLssds := []string{}
351+
for _, l := range allLssds {
352+
if !slices.Contains(raidedLssds, l) {
353+
unRaidedLssds = append(unRaidedLssds, l)
354+
}
355+
if len(unRaidedLssds) == lssdCount {
356+
break
357+
}
358+
}
359+
360+
LSSDsWithEmptyMountPoint, err := driver.FetchLSSDsWihtEmptyMountPoint()
361+
if err != nil {
362+
return nil, fmt.Errorf("Error listing LSSDs with empty mountpoint: %v", err)
363+
}
364+
365+
// We need to ensure the disks to be used for Datacache are both unRAIDed & not containing mountpoints for ephemeral storage already
366+
availableLssds := slices.Filter(nil, unRaidedLssds, func(e string) bool {
367+
return slices.Contains(LSSDsWithEmptyMountPoint, e)
368+
})
369+
370+
if len(availableLssds) == 0 {
371+
return nil, fmt.Errorf("No LSSDs available to set up caching")
372+
}
373+
374+
if len(availableLssds) < lssdCount {
375+
return nil, fmt.Errorf("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v", len(availableLssds), lssdCount)
376+
}
377+
return availableLssds, nil
378+
}
379+
353380
func setupDataCache(ctx context.Context, nodeName string) error {
354-
klog.V(2).Infof("Setting up data cache for node %s", nodeName)
381+
isAlreadyRaided, err := driver.IsRaided()
382+
if err != nil {
383+
klog.V(2).Infof("============================== Errored while scanning for available LocalSSDs err:%v; continuing Raiding ==============================", err)
384+
} else if isAlreadyRaided {
385+
klog.V(2).Infof("============================== Local SSDs are already RAIDed. Skipping Datacache setup ==============================")
386+
return nil
387+
}
388+
389+
lssdCount := common.LocalSSDCountForDataCache
355390
if nodeName != common.TestNode {
356-
cfg, err := rest.InClusterConfig()
357-
if err != nil {
358-
return err
359-
}
360-
kubeClient, err := kubernetes.NewForConfig(cfg)
361-
if err != nil {
362-
return err
391+
var err error
392+
lssdCount, err = driver.GetDataCacheCountFromNodeLabel(ctx, nodeName)
393+
if lssdCount == 0 {
394+
klog.Infof("Datacache is not enabled on node %v", nodeName)
395+
return nil
363396
}
364-
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
365397
if err != nil {
366-
// We could retry, but this error will also crashloop the driver which may be as good a way to retry as any.
367398
return err
368399
}
369-
if val, found := node.GetLabels()[dataCacheLabel]; !found || val != dataCacheLabelValue {
370-
klog.V(2).Infof("Datacache not enabled for node %s; node label %s=%s and not %s", nodeName, dataCacheLabel, val, dataCacheLabelValue)
371-
return nil
372-
}
373400
}
374-
klog.V(2).Info("Raiding local ssds to setup data cache")
375-
if err := driver.RaidLocalSsds(); err != nil {
401+
lssdNames, err := fetchLssdsForRaiding(lssdCount)
402+
if err != nil {
403+
klog.Fatalf("Failed to get sufficient SSDs for Datacache's caching setup: %v", err)
404+
}
405+
klog.V(2).Infof("Raiding local ssds to setup data cache: %v", lssdNames)
406+
if err := driver.RaidLocalSsds(lssdNames); err != nil {
376407
return fmt.Errorf("Failed to Raid local SSDs, unable to setup data caching, got error %v", err)
377408
}
378409

pkg/common/constants.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,11 @@ const (
4444
ContexLocalSsdCacheSize = "local-ssd-cache-size"
4545
// Node name for E2E tests
4646
TestNode = "test-node-csi-e2e"
47+
48+
// Default LSSD count for datacache E2E tests
49+
LocalSSDCountForDataCache = 2
50+
51+
// Node label for datacache
52+
NodeLabelPrefix = "cloud.google.com/%s"
53+
DataCacheLssdCountLabel = "gke-data-cache-disk"
4754
)

pkg/common/runcmd.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const (
1616
// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput.
1717
// On error, the output is included so callers don't need to echo it again.
1818

19-
func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
19+
func RunCommand(pipeCmd string, pipeCmdArg []string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
2020
execCmd1 := exec.Command(cmd1, execCmdArgs...)
2121

2222
if pipeCmd != "" {
@@ -47,9 +47,9 @@ func checkError(err error, execCmd exec.Cmd) error {
4747
}
4848
return err
4949
}
50-
func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) {
50+
func execPipeCommand(pipeCmd string, pipeCmdArg []string, execCmd1 *exec.Cmd) ([]byte, error) {
5151

52-
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg)
52+
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg...)
5353
stdoutPipe, err := execCmd1.StdoutPipe()
5454
if err != nil {
5555
klog.Errorf("failed command %v: got error:%v", execCmd1, err)
@@ -63,8 +63,12 @@ func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]b
6363
execPipeCmd.Stdin = stdoutPipe
6464
output, err := execPipeCmd.CombinedOutput()
6565
if err != nil {
66+
// Some commands (such as grep) will return an error with exit status of 1
67+
if len(output) == 0 && err.(*exec.ExitError).ExitCode() == 1 {
68+
return output, nil
69+
}
6670
err = checkError(err, *execPipeCmd)
67-
return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output))
71+
return nil, fmt.Errorf("%s failed: %w; output: %s", execPipeCmd, err, string(output))
6872
}
6973

7074
return output, nil

0 commit comments

Comments
 (0)