Skip to content

Commit c5e8f77

Browse files
committed
Merge branch 'master' into master
2 parents cdd1023 + ed6b156 commit c5e8f77

File tree

10 files changed

+316
-132
lines changed

10 files changed

+316
-132
lines changed

cmd/gce-pd-csi-driver/main.go

+67-28
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30-
"k8s.io/client-go/kubernetes"
31-
"k8s.io/client-go/rest"
3230
"k8s.io/klog/v2"
3331
"k8s.io/utils/strings/slices"
34-
35-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3632
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3733
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
3834
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
@@ -79,7 +75,7 @@ var (
7975
fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")
8076
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
8177
enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks")
82-
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with data cache configuration")
78+
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration")
8379
nodeName = flag.String("node-name", "", "The node this driver is running on")
8480

8581
multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
@@ -102,9 +98,7 @@ var (
10298
)
10399

104100
const (
105-
driverName = "pd.csi.storage.gke.io"
106-
dataCacheLabel = "datacache-storage-gke-io"
107-
dataCacheLabelValue = "enabled"
101+
driverName = "pd.csi.storage.gke.io"
108102
)
109103

110104
func init() {
@@ -136,7 +130,7 @@ func handle() {
136130
if version == "" {
137131
klog.Fatalf("version must be set at compile time")
138132
}
139-
klog.V(2).Infof("Driver vendor version %v", version)
133+
klog.V(4).Infof("Driver vendor version %v", version)
140134

141135
// Start tracing as soon as possible
142136
if *enableOtelTracing {
@@ -264,10 +258,10 @@ func handle() {
264258

265259
if *enableDataCacheFlag {
266260
if nodeName == nil || *nodeName == "" {
267-
klog.Errorf("Data cache enabled, but --node-name not passed")
261+
klog.Errorf("Data Cache enabled, but --node-name not passed")
268262
}
269263
if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil {
270-
klog.Errorf("DataCache setup failed: %v", err)
264+
klog.Errorf("Data Cache setup failed: %v", err)
271265
}
272266
go driver.StartWatcher(*nodeName)
273267
}
@@ -351,37 +345,82 @@ func urlFlag(target **url.URL, name string, usage string) {
351345
})
352346
}
353347

354-
func setupDataCache(ctx context.Context, nodeName string, nodeId string) error {
355-
klog.V(2).Infof("Setting up data cache for node %s", nodeName)
356-
if nodeName != common.TestNode {
357-
cfg, err := rest.InClusterConfig()
358-
if err != nil {
359-
return err
348+
func fetchLssdsForRaiding(lssdCount int) ([]string, error) {
349+
allLssds, err := driver.FetchAllLssds()
350+
if err != nil {
351+
return nil, fmt.Errorf("Error listing all LSSDs %v", err)
352+
}
353+
354+
raidedLssds, err := driver.FetchRaidedLssds()
355+
if err != nil {
356+
return nil, fmt.Errorf("Error listing RAIDed LSSDs %v", err)
357+
}
358+
359+
unRaidedLssds := []string{}
360+
for _, l := range allLssds {
361+
if !slices.Contains(raidedLssds, l) {
362+
unRaidedLssds = append(unRaidedLssds, l)
360363
}
361-
kubeClient, err := kubernetes.NewForConfig(cfg)
362-
if err != nil {
363-
return err
364+
if len(unRaidedLssds) == lssdCount {
365+
break
364366
}
365-
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
367+
}
368+
369+
LSSDsWithEmptyMountPoint, err := driver.FetchLSSDsWihtEmptyMountPoint()
370+
if err != nil {
371+
return nil, fmt.Errorf("Error listing LSSDs with empty mountpoint: %v", err)
372+
}
373+
374+
// We need to ensure the disks to be used for Data Cache are both unRAIDed & not containing mountpoints for ephemeral storage already
375+
availableLssds := slices.Filter(nil, unRaidedLssds, func(e string) bool {
376+
return slices.Contains(LSSDsWithEmptyMountPoint, e)
377+
})
378+
379+
if len(availableLssds) == 0 {
380+
return nil, fmt.Errorf("No LSSDs available to set up caching")
381+
}
382+
383+
if len(availableLssds) < lssdCount {
384+
return nil, fmt.Errorf("Not enough LSSDs available to set up caching. Available LSSDs: %v, wanted LSSDs: %v", len(availableLssds), lssdCount)
385+
}
386+
return availableLssds, nil
387+
}
388+
389+
func setupDataCache(ctx context.Context, nodeName string, nodeId string) error {
390+
isAlreadyRaided, err := driver.IsRaided()
391+
if err != nil {
392+
klog.V(4).Infof("Errored while scanning for available LocalSSDs err:%v; continuing Raiding", err)
393+
} else if isAlreadyRaided {
394+
klog.V(4).Infof("Local SSDs are already RAIDed. Skipping Data Cache setup.")
395+
return nil
396+
}
397+
398+
lssdCount := common.LocalSSDCountForDataCache
399+
if nodeName != common.TestNode {
400+
var err error
401+
lssdCount, err = driver.GetDataCacheCountFromNodeLabel(ctx, nodeName)
366402
if err != nil {
367-
// We could retry, but this error will also crashloop the driver which may be as good a way to retry as any.
368403
return err
369404
}
370-
if val, found := node.GetLabels()[dataCacheLabel]; !found || val != dataCacheLabelValue {
371-
klog.V(2).Infof("Datacache not enabled for node %s; node label %s=%s and not %s", nodeName, dataCacheLabel, val, dataCacheLabelValue)
405+
if lssdCount == 0 {
406+
klog.V(4).Infof("Data Cache is not enabled on node %v, so skipping caching setup", nodeName)
372407
return nil
373408
}
374409
}
375-
klog.V(2).Info("Raiding local ssds to setup data cache")
376-
if err := driver.RaidLocalSsds(); err != nil {
377-
return fmt.Errorf("failed to Raid local SSDs, unable to setup data caching, got error %v", err)
410+
lssdNames, err := fetchLssdsForRaiding(lssdCount)
411+
if err != nil {
412+
klog.Fatalf("Failed to get sufficient SSDs for Data Cache's caching setup: %v", err)
413+
}
414+
klog.V(4).Infof("Raiding local ssds to setup Data Cache: %v", lssdNames)
415+
if err := driver.RaidLocalSsds(lssdNames); err != nil {
416+
return fmt.Errorf("Failed to Raid local SSDs, unable to setup Data Cache, got error %v", err)
378417
}
379418

380419
// Initializing data cache node (VG checks w/ raided lssd)
381420
if err := driver.InitializeDataCacheNode(nodeId); err != nil {
382421
return err
383422
}
384423

385-
klog.V(2).Infof("Datacache enabled for node %s", nodeName)
424+
klog.V(4).Infof("LSSD caching is setup for the Data Cache enabled node %s", nodeName)
386425
return nil
387426
}

pkg/common/constants.go

+7
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,11 @@ const (
4949
ContexLocalSsdCacheSize = "local-ssd-cache-size"
5050
// Node name for E2E tests
5151
TestNode = "test-node-csi-e2e"
52+
53+
// Default LSSD count for datacache E2E tests
54+
LocalSSDCountForDataCache = 2
55+
56+
// Node label for Data Cache (only applicable to GKE nodes)
57+
NodeLabelPrefix = "cloud.google.com/%s"
58+
DataCacheLssdCountLabel = "gke-data-cache-disk"
5259
)

pkg/common/runcmd.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const (
1616
// RunCommand wraps a k8s exec to deal with the no child process error. Same as exec.CombinedOutput.
1717
// On error, the output is included so callers don't need to echo it again.
1818

19-
func RunCommand(pipeCmd string, pipeCmdArg string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
19+
func RunCommand(pipeCmd string, pipeCmdArg []string, cmd1 string, execCmdArgs ...string) ([]byte, error) {
2020
execCmd1 := exec.Command(cmd1, execCmdArgs...)
2121

2222
if pipeCmd != "" {
@@ -47,9 +47,9 @@ func checkError(err error, execCmd exec.Cmd) error {
4747
}
4848
return err
4949
}
50-
func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]byte, error) {
50+
func execPipeCommand(pipeCmd string, pipeCmdArg []string, execCmd1 *exec.Cmd) ([]byte, error) {
5151

52-
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg)
52+
execPipeCmd := exec.Command(pipeCmd, pipeCmdArg...)
5353
stdoutPipe, err := execCmd1.StdoutPipe()
5454
if err != nil {
5555
klog.Errorf("failed command %v: got error:%v", execCmd1, err)
@@ -63,8 +63,12 @@ func execPipeCommand(pipeCmd string, pipeCmdArg string, execCmd1 *exec.Cmd) ([]b
6363
execPipeCmd.Stdin = stdoutPipe
6464
output, err := execPipeCmd.CombinedOutput()
6565
if err != nil {
66+
// Some commands (such as grep) will return an error with exit status of 1
67+
if len(output) == 0 && err.(*exec.ExitError).ExitCode() == 1 {
68+
return output, nil
69+
}
6670
err = checkError(err, *execPipeCmd)
67-
return nil, fmt.Errorf("%s failed: %w; output: %s", pipeCmd, err, string(output))
71+
return nil, fmt.Errorf("%s failed: %w; output: %s", execPipeCmd, err, string(output))
6872
}
6973

7074
return output, nil

0 commit comments

Comments
 (0)