Skip to content

Commit ed6b156

Browse files
authored
Merge pull request #1950 from hungnguyen243/release-999.2.new
Update RAID logic and post-RAID validation to integrate Datacache support for GKE nodes
2 parents 39a5910 + e33bb0d commit ed6b156

File tree

10 files changed

+308
-122
lines changed

10 files changed

+308
-122
lines changed

cmd/gce-pd-csi-driver/main.go

+67-28
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30-
"k8s.io/client-go/kubernetes"
31-
"k8s.io/client-go/rest"
3230
"k8s.io/klog/v2"
3331
"k8s.io/utils/strings/slices"
34-
35-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3632
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3733
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
3834
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
@@ -79,7 +75,7 @@ var (
7975
fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")
8076
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
8177
enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks")
82-
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration")
78+
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration")
8379
nodeName = flag.String("node-name", "", "The node this driver is running on")
8480

8581
multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
@@ -102,9 +98,7 @@ var (
10298
)
10399

104100
const (
105-
driverName = "pd.csi.storage.gke.io"
106-
dataCacheLabel = "datacache-storage-gke-io"
107-
dataCacheLabelValue = "enabled"
101+
driverName = "pd.csi.storage.gke.io"
108102
)
109103

110104
func init() {
@@ -136,7 +130,7 @@ func handle() {
136130
if version == "" {
137131
klog.Fatalf("version must be set at compile time")
138132
}
139-
klog.V(2).Infof("Driver vendor version %v", version)
133+
klog.V(4).Infof("Driver vendor version %v", version)
140134

141135
// Start tracing as soon as possible
142136
if *enableOtelTracing {
@@ -264,10 +258,10 @@ func handle() {
264258

265259
if *enableDataCacheFlag {
266260
if nodeName == nil || *nodeName == "" {
267-
klog.Errorf("Data cache enabled, but --node-name not passed")
261+
klog.Errorf("Data Cache enabled, but --node-name not passed")
268262
}
269263
if err := setupDataCache(ctx, *nodeName); err != nil {
270-
klog.Errorf("DataCache setup failed: %v", err)
264+
klog.Errorf("Data Cache setup failed: %v", err)
271265
}
272266
}
273267

@@ -350,32 +344,77 @@ func urlFlag(target **url.URL, name string, usage string) {
350344
})
351345
}
352346

353-
func setupDataCache(ctx context.Context, nodeName string) error {
354-
klog.V(2).Infof("Setting up data cache for node %s", nodeName)
355-
if nodeName != common.TestNode {
356-
cfg, err := rest.InClusterConfig()
357-
if err != nil {
358-
return err
347+
func fetchLssdsForRaiding(lssdCount int) ([]string, error) {
348+
allLssds, err := driver.FetchAllLssds()
349+
if err != nil {
350+
return nil, fmt.Errorf("Error listing all LSSDs %v", err)
351+
}
352+
353+
raidedLssds, err := driver.FetchRaidedLssds()
354+
if err != nil {
355+
return nil, fmt.Errorf("Error listing RAIDed LSSDs %v", err)
356+
}
357+
358+
unRaidedLssds := []string{}
359+
for _, l := range allLssds {
360+
if !slices.Contains(raidedLssds, l) {
361+
unRaidedLssds = append(unRaidedLssds, l)
359362
}
360-
kubeClient, err := kubernetes.NewForConfig(cfg)
361-
if err != nil {
362-
return err
363+
if len(unRaidedLssds) == lssdCount {
364+
break
363365
}
364-
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
366+
}
367+
368+
LSSDsWithEmptyMountPoint, err := driver.FetchLSSDsWihtEmptyMountPoint()
369+
if err != nil {
370+
return nil, fmt.Errorf("Error listing LSSDs with empty mountpoint: %v", err)
371+
}
372+
373+
// We need to ensure the disks to be used for Data Cache are both unRAIDed & not containing mountpoints for ephemeral storage already
374+
availableLssds := slices.Filter(nil, unRaidedLssds, func(e string) bool {
375+
return slices.Contains(LSSDsWithEmptyMountPoint, e)
376+
})
377+
378+
if len(availableLssds) == 0 {
379+
return nil, fmt.Errorf("No LSSDs available to set up caching")
380+
}
381+
382+
if len(availableLssds) < lssdCount {
383+
return nil, fmt.Errorf("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v", len(availableLssds), lssdCount)
384+
}
385+
return availableLssds, nil
386+
}
387+
388+
func setupDataCache(ctx context.Context, nodeName string) error {
389+
isAlreadyRaided, err := driver.IsRaided()
390+
if err != nil {
391+
klog.V(4).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err)
392+
} else if isAlreadyRaided {
393+
klog.V(4).Infof("Local SSDs are already RAIDed. Skipping Data Cache setup.")
394+
return nil
395+
}
396+
397+
lssdCount := common.LocalSSDCountForDataCache
398+
if nodeName != common.TestNode {
399+
var err error
400+
lssdCount, err = driver.GetDataCacheCountFromNodeLabel(ctx, nodeName)
365401
if err != nil {
366-
// We could retry, but this error will also crashloop the driver which may be as good a way to retry as any.
367402
return err
368403
}
369-
if val, found := node.GetLabels()[dataCacheLabel]; !found || val != dataCacheLabelValue {
370-
klog.V(2).Infof("Datacache not enabled for node %s; node label %s=%s and not %s", nodeName, dataCacheLabel, val, dataCacheLabelValue)
404+
if lssdCount == 0 {
405+
klog.V(4).Infof("Data Cache is not enabled on node %v, so skipping caching setup", nodeName)
371406
return nil
372407
}
373408
}
374-
klog.V(2).Info("Raiding local ssds to setup data cache")
375-
if err := driver.RaidLocalSsds(); err != nil {
376-
return fmt.Errorf("Failed to Raid local SSDs, unable to setup data caching, got error %v", err)
409+
lssdNames, err := fetchLssdsForRaiding(lssdCount)
410+
if err != nil {
411+
klog.Fatalf("Failed to get sufficient SSDs for Data Cache's caching setup: %v", err)
412+
}
413+
klog.V(4).Infof("Raiding local ssds to setup Data Cache: %v", lssdNames)
414+
if err := driver.RaidLocalSsds(lssdNames); err != nil {
415+
return fmt.Errorf("Failed to Raid local SSDs, unable to setup Data Cache, got error %v", err)
377416
}
378417

379-
klog.V(2).Infof("Datacache enabled for node %s", nodeName)
418+
klog.V(4).Infof("LSSD caching is setup for the Data Cache enabled node %s", nodeName)
380419
return nil
381420
}

pkg/common/constants.go

+7
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,11 @@ const (
4949
ContexLocalSsdCacheSize = "local-ssd-cache-size"
5050
// Node name for E2E tests
5151
TestNode = "test-node-csi-e2e"
52+
53+
// Default LSSD count for datacache E2E tests
54+
LocalSSDCountForDataCache = 2
55+
56+
// Node label for Data Cache (only applicable to GKE nodes)
57+
NodeLabelPrefix = "cloud.google.com/%s"
58+
DataCacheLssdCountLabel = "gke-data-cache-disk"
5259
)

pkg/common/runcmd.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const (
1616
// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput.
1717
// On error, the output is included so callers don't need to echo it again.
1818

19-
func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
19+
func RunCommand(pipeCmd string, pipeCmdArg []string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
2020
execCmd1 := exec.Command(cmd1, execCmdArgs...)
2121

2222
if pipeCmd != "" {
@@ -47,9 +47,9 @@ func checkError(err error, execCmd exec.Cmd) error {
4747
}
4848
return err
4949
}
50-
func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) {
50+
func execPipeCommand(pipeCmd string, pipeCmdArg []string, execCmd1 *exec.Cmd) ([]byte, error) {
5151

52-
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg)
52+
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg...)
5353
stdoutPipe, err := execCmd1.StdoutPipe()
5454
if err != nil {
5555
klog.Errorf("failed command %v: got error:%v", execCmd1, err)
@@ -63,8 +63,12 @@ func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]b
6363
execPipeCmd.Stdin = stdoutPipe
6464
output, err := execPipeCmd.CombinedOutput()
6565
if err != nil {
66+
// Some commands (such as grep) will return an error with exit status of 1
67+
if len(output) == 0 && err.(*exec.ExitError).ExitCode() == 1 {
68+
return output, nil
69+
}
6670
err = checkError(err, *execPipeCmd)
67-
return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output))
71+
return nil, fmt.Errorf("%s failed: %w; output: %s", execPipeCmd, err, string(output))
6872
}
6973

7074
return output, nil

0 commit comments

Comments
 (0)