Skip to content

Commit 1566da5

Browse files
committed
update RAIDing and validation steps
1 parent 84e1b15 commit 1566da5

File tree

8 files changed

+293
-115
lines changed

8 files changed

+293
-115
lines changed

cmd/gce-pd-csi-driver/main.go

+62-23
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30-
"k8s.io/client-go/kubernetes"
31-
"k8s.io/client-go/rest"
3230
"k8s.io/klog/v2"
3331
"k8s.io/utils/strings/slices"
34-
35-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3632
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3733
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
3834
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
@@ -102,9 +98,7 @@ var (
10298
)
10399

104100
const (
105-
driverName = "pd.csi.storage.gke.io"
106-
dataCacheLabel = "datacache-storage-gke-io"
107-
dataCacheLabelValue = "enabled"
101+
driverName = "pd.csi.storage.gke.io"
108102
)
109103

110104
func init() {
@@ -350,29 +344,74 @@ func urlFlag(target **url.URL, name string, usage string) {
350344
})
351345
}
352346

347+
func fetchLssdsForRaiding(lssdCount int) ([]string, error) {
348+
allLssds, err := driver.FetchAllLssds()
349+
if err != nil {
350+
return nil, fmt.Errorf("Error listing all LSSDs %v", err)
351+
}
352+
353+
raidedLssds, err := driver.FetchRaidedLssds()
354+
if err != nil {
355+
return nil, fmt.Errorf("Error listing RAIDed LSSDs %v", err)
356+
}
357+
358+
unRaidedLssds := []string{}
359+
for _, l := range allLssds {
360+
if !slices.Contains(raidedLssds, l) {
361+
unRaidedLssds = append(unRaidedLssds, l)
362+
}
363+
if len(unRaidedLssds) == lssdCount {
364+
break
365+
}
366+
}
367+
368+
LSSDsWithEmptyMountPoint, err := driver.FetchLSSDsWihtEmptyMountPoint()
369+
if err != nil {
370+
return nil, fmt.Errorf("Error listing LSSDs with empty mountpoint: %v", err)
371+
}
372+
373+
// We need to ensure the disks to be used for Datacache are both unRAIDed & not containing mountpoints for ephemeral storage already
374+
availableLssds := slices.Filter(nil, unRaidedLssds, func(e string) bool {
375+
return slices.Contains(LSSDsWithEmptyMountPoint, e)
376+
})
377+
378+
if len(availableLssds) == 0 {
379+
return nil, fmt.Errorf("No LSSDs available to set up caching")
380+
}
381+
382+
if len(availableLssds) < lssdCount {
383+
return nil, fmt.Errorf("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v", len(availableLssds), lssdCount)
384+
}
385+
return availableLssds, nil
386+
}
387+
353388
func setupDataCache(ctx context.Context, nodeName string) error {
354-
klog.V(2).Infof("Setting up data cache for node %s", nodeName)
389+
isAlreadyRaided, err := driver.IsRaided()
390+
if err != nil {
391+
klog.V(2).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err)
392+
} else if isAlreadyRaided {
393+
klog.V(2).Infof("Local SSDs are already RAIDed. Skipping Datacache setup.")
394+
return nil
395+
}
396+
397+
lssdCount := common.LocalSSDCountForDataCache
355398
if nodeName != common.TestNode {
356-
cfg, err := rest.InClusterConfig()
357-
if err != nil {
358-
return err
359-
}
360-
kubeClient, err := kubernetes.NewForConfig(cfg)
361-
if err != nil {
362-
return err
399+
var err error
400+
lssdCount, err = driver.GetDataCacheCountFromNodeLabel(ctx, nodeName)
401+
if lssdCount == 0 {
402+
klog.Infof("Datacache is not enabled on node %v", nodeName)
403+
return nil
363404
}
364-
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
365405
if err != nil {
366-
// We could retry, but this error will also crashloop the driver which may be as good a way to retry as any.
367406
return err
368407
}
369-
if val, found := node.GetLabels()[dataCacheLabel]; !found || val != dataCacheLabelValue {
370-
klog.V(2).Infof("Datacache not enabled for node %s; node label %s=%s and not %s", nodeName, dataCacheLabel, val, dataCacheLabelValue)
371-
return nil
372-
}
373408
}
374-
klog.V(2).Info("Raiding local ssds to setup data cache")
375-
if err := driver.RaidLocalSsds(); err != nil {
409+
lssdNames, err := fetchLssdsForRaiding(lssdCount)
410+
if err != nil {
411+
klog.Fatalf("Failed to get sufficient SSDs for Datacache's caching setup: %v", err)
412+
}
413+
klog.V(2).Infof("Raiding local ssds to setup data cache: %v", lssdNames)
414+
if err := driver.RaidLocalSsds(lssdNames); err != nil {
376415
return fmt.Errorf("Failed to Raid local SSDs, unable to setup data caching, got error %v", err)
377416
}
378417

pkg/common/constants.go

+7
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,11 @@ const (
4444
ContexLocalSsdCacheSize = "local-ssd-cache-size"
4545
// Node name for E2E tests
4646
TestNode = "test-node-csi-e2e"
47+
48+
// Default LSSD count for datacache E2E tests
49+
LocalSSDCountForDataCache = 2
50+
51+
// Node label for datacache
52+
NodeLabelPrefix = "cloud.google.com/%s"
53+
DataCacheLssdCountLabel = "gke-data-cache-disk"
4754
)

pkg/common/runcmd.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const (
1616
// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput.
1717
// On error, the output is included so callers don't need to echo it again.
1818

19-
func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
19+
func RunCommand(pipeCmd string, pipeCmdArg []string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
2020
execCmd1 := exec.Command(cmd1, execCmdArgs...)
2121

2222
if pipeCmd != "" {
@@ -47,9 +47,9 @@ func checkError(err error, execCmd exec.Cmd) error {
4747
}
4848
return err
4949
}
50-
func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) {
50+
func execPipeCommand(pipeCmd string, pipeCmdArg []string, execCmd1 *exec.Cmd) ([]byte, error) {
5151

52-
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg)
52+
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg...)
5353
stdoutPipe, err := execCmd1.StdoutPipe()
5454
if err != nil {
5555
klog.Errorf("failed command %v: got error:%v", execCmd1, err)
@@ -63,8 +63,12 @@ func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]b
6363
execPipeCmd.Stdin = stdoutPipe
6464
output, err := execPipeCmd.CombinedOutput()
6565
if err != nil {
66+
// Some commands (such as grep) will return an error with exit status of 1
67+
if len(output) == 0 && err.(*exec.ExitError).ExitCode() == 1 {
68+
return output, nil
69+
}
6670
err = checkError(err, *execPipeCmd)
67-
return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output))
71+
return nil, fmt.Errorf("%s failed: %w; output: %s", execPipeCmd, err, string(output))
6872
}
6973

7074
return output, nil

0 commit comments

Comments
 (0)