Skip to content

Commit d73f071

Browse files
committed
Up to date changes
1 parent d7c5a23 commit d73f071

File tree

5 files changed

+79
-3
lines changed

5 files changed

+79
-3
lines changed

cmd/gce-pd-csi-driver/main.go

+23
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30+
"k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/rest"
3032
"k8s.io/klog/v2"
3133
"k8s.io/utils/strings/slices"
3234
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
@@ -239,19 +241,27 @@ func handle() {
239241
if err != nil {
240242
klog.Fatalf("Failed to get safe mounter: %v", err.Error())
241243
}
244+
242245
deviceUtils := deviceutils.NewDeviceUtils()
243246
statter := mountmanager.NewStatter(mounter)
244247
meta, err := metadataservice.NewMetadataService()
245248
if err != nil {
246249
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
247250
}
251+
252+
kubeClient, err := instantiateKubeClient()
253+
if err != nil {
254+
klog.Fatalf("Failed to instantiate Kubernetes client: %v", err)
255+
}
248256
nsArgs := driver.NodeServerArgs{
249257
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
250258
DeviceInUseTimeout: *deviceInUseTimeout,
251259
EnableDataCache: *enableDataCacheFlag,
252260
DataCacheEnabledNodePool: isDataCacheEnabledNodePool(ctx, *nodeName),
261+
KubeClient: kubeClient,
253262
}
254263
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
264+
255265
if *maxConcurrentFormatAndMount > 0 {
256266
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
257267
}
@@ -288,6 +298,19 @@ func handle() {
288298
gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing, metricsManager)
289299
}
290300

301+
func instantiateKubeClient() (*kubernetes.Clientset, error) {
302+
klog.V(2).Infof("Setting up kubeClient")
303+
cfg, err := rest.InClusterConfig()
304+
if err != nil {
305+
return nil, fmt.Errorf("failed to create REST Config for k8s client: %w", err)
306+
}
307+
kubeClient, err := kubernetes.NewForConfig(cfg)
308+
if err != nil {
309+
return nil, fmt.Errorf("failed to create k8s client: %w", err)
310+
}
311+
return kubeClient, nil
312+
}
313+
291314
func notEmpty(v string) bool {
292315
return v != ""
293316
}

pkg/common/constants.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ package common
1818

1919
const (
2020
// Keys for Topology. This key will be shared amongst drivers from GCP
21-
TopologyKeyZone = "topology.gke.io/zone"
21+
TopologyKeyPrefix = "topology.gke.io"
22+
TopologyKeyZone = TopologyKeyPrefix + "/zone"
2223

2324
// VolumeAttributes for Partition
2425
VolumeAttributePartition = "partition"

pkg/common/utils.go

+8
Original file line numberDiff line numberDiff line change
@@ -749,3 +749,11 @@ func ShortString(s string) string {
749749
}
750750
return string(short)
751751
}
752+
753+
func IsGKETopologyLabel(key string) bool {
754+
// This is the actual code
755+
// return strings.HasPrefix(key, gkeTopologyLabelPrefix)
756+
757+
// More permissive code for testing
758+
return strings.HasPrefix(key, "topology.gke")
759+
}

pkg/gce-pd-csi-driver/gce-pd-driver.go

+1
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
157157
deviceInUseErrors: newDeviceErrMap(args.DeviceInUseTimeout),
158158
EnableDataCache: args.EnableDataCache,
159159
DataCacheEnabledNodePool: args.DataCacheEnabledNodePool,
160+
KubeClient: args.KubeClient,
160161
}
161162
}
162163

pkg/gce-pd-csi-driver/node.go

+45-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ import (
3030

3131
csi "github.com/container-storage-interface/spec/lib/go/csi"
3232

33+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334
"k8s.io/klog/v2"
3435
"k8s.io/mount-utils"
3536

37+
"k8s.io/client-go/kubernetes"
3638
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3739
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
3840
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
@@ -49,6 +51,8 @@ type GCENodeServer struct {
4951
EnableDataCache bool
5052
DataCacheEnabledNodePool bool
5153

54+
KubeClient *kubernetes.Clientset
55+
5256
// A map storing all volumes with ongoing operations so that additional operations
5357
// for that same volume (as defined by VolumeID) return an Aborted error
5458
volumeLocks *common.VolumeLocks
@@ -84,6 +88,9 @@ type NodeServerArgs struct {
8488
EnableDataCache bool
8589

8690
DataCacheEnabledNodePool bool
91+
92+
// Kubernetes client for API server interactions
93+
KubeClient *kubernetes.Clientset
8794
}
8895

8996
var _ csi.NodeServer = &GCENodeServer{}
@@ -556,22 +563,58 @@ func (ns *GCENodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeG
556563
}
557564

558565
func (ns *GCENodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
566+
labels, err := ns.gkeTopologyLabels(ctx, ns.MetadataService.GetName())
567+
if err != nil {
568+
// Perhaps we don't want to fail here. We are introducing a new
569+
// dependency and we might be better off allowing this failure to
570+
// happen and moving on to retrieve the zone from GCE MDS.
571+
return nil, err
572+
}
573+
574+
// Each "Topology" struct will later be translated into an individual
575+
// 'matchExpressions' block in the PV's NodeAffinity. Because we always
576+
// need to match on both the zone AND the disk type, both the zone and the
577+
// supported disks belong as segments on a single Topology.
559578
top := &csi.Topology{
560-
Segments: map[string]string{common.TopologyKeyZone: ns.MetadataService.GetZone()},
579+
Segments: labels,
561580
}
562581

563582
nodeID := common.CreateNodeID(ns.MetadataService.GetProject(), ns.MetadataService.GetZone(), ns.MetadataService.GetName())
564-
565583
volumeLimits, err := ns.GetVolumeLimits()
566584

567585
resp := &csi.NodeGetInfoResponse{
568586
NodeId: nodeID,
569587
MaxVolumesPerNode: volumeLimits,
570588
AccessibleTopology: top,
571589
}
590+
591+
klog.V(2).Infof("Returning NodeGetInfoResponse: %+v", resp)
592+
572593
return resp, err
573594
}
574595

596+
// gkeTopologyLabels retrieves the node labels with the prefix
597+
// `topology.gke.io/`.
598+
func (ns *GCENodeServer) gkeTopologyLabels(ctx context.Context, nodeName string) (map[string]string, error) {
599+
klog.V(2).Infof("Retrieving node topology labels for node %q", nodeName)
600+
601+
node, err := ns.KubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
602+
if err != nil {
603+
// We should retry instead. Need to figure out how much wrong-ness can be tolerated and how often CSINode gets refreshed.
604+
return nil, err
605+
}
606+
607+
topology := make(map[string]string)
608+
for k, v := range node.GetLabels() {
609+
if common.IsGKETopologyLabel(k) {
610+
klog.V(2).Infof("Including node topology label %q=%q", k, v)
611+
topology[k] = v
612+
}
613+
}
614+
615+
return topology, nil
616+
}
617+
575618
func (ns *GCENodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
576619
if len(req.VolumeId) == 0 {
577620
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")

0 commit comments

Comments
 (0)