Skip to content

Commit ae96677

Browse files
committed
New implementation that is hooked into nodestage/unstage. Just linux
right now.
1 parent b92fd98 commit ae96677

File tree

10 files changed

+218
-456
lines changed

10 files changed

+218
-456
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,20 @@ func handle() {
260260
klog.Fatalf("Failed to get node info from API server: %v", err.Error())
261261
}
262262

263+
deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName)
264+
if err != nil {
265+
klog.Fatalf("Failed to create device cache: %v", err.Error())
266+
}
267+
go deviceCache.Run(ctx)
268+
263269
// TODO(2042): Move more of the constructor args into this struct
264270
nsArgs := &driver.NodeServerArgs{
265271
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
266272
DeviceInUseTimeout: *deviceInUseTimeout,
267273
EnableDataCache: *enableDataCacheFlag,
268274
DataCacheEnabledNodePool: isDataCacheEnabledNodePool,
269275
SysfsPath: "/sys",
276+
DeviceCache: deviceCache,
270277
}
271278
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
272279

@@ -285,9 +292,10 @@ func handle() {
285292
}
286293
}
287294

288-
go linkcache.NewListingCache(1*time.Minute, "/dev/disk/by-id/").Run(ctx)
289295
}
290296

297+
klog.Infof("NOT BLOCKED")
298+
291299
err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)
292300
if err != nil {
293301
klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())

deploy/kubernetes/base/controller/controller.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ spec:
145145
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
146146
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
147147
- --enable-data-cache
148+
- --run-node-service=false
148149
command:
149150
- /gce-pd-csi-driver
150151
env:

deploy/kubernetes/images/stable-master/image.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ metadata:
4545
imageTag:
4646
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
4747
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
48-
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
49-
newTag: "v1.17.2"
48+
newName: us-central1-docker.pkg.dev/juliankatz-joonix/csi-dev/gcp-compute-persistent-disk-csi-driver
49+
newTag: "latest"
5050
---
5151

pkg/gce-pd-csi-driver/cache.go

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,14 @@ import (
77
"regexp"
88
"strconv"
99
"strings"
10-
"time"
1110

1211
csi "github.com/container-storage-interface/spec/lib/go/csi"
1312
fsnotify "github.com/fsnotify/fsnotify"
1413
"google.golang.org/grpc/codes"
1514
"google.golang.org/grpc/status"
16-
v1 "k8s.io/api/core/v1"
17-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18-
"k8s.io/apimachinery/pkg/util/wait"
19-
"k8s.io/client-go/kubernetes"
20-
"k8s.io/client-go/rest"
2115
"k8s.io/klog/v2"
2216
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
17+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
2318
)
2419

2520
const (
@@ -257,18 +252,11 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con
257252
}
258253

259254
func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) {
260-
cfg, err := rest.InClusterConfig()
261-
if err != nil {
262-
return 0, err
263-
}
264-
kubeClient, err := kubernetes.NewForConfig(cfg)
265-
if err != nil {
266-
return 0, err
267-
}
268-
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
255+
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
269256
if err != nil {
270257
return 0, err
271258
}
259+
272260
if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found {
273261
dataCacheCount, err := strconv.Atoi(val)
274262
if err != nil {
@@ -280,30 +268,6 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int,
280268
return 0, nil
281269
}
282270

283-
func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
284-
var nodeObj *v1.Node
285-
backoff := wait.Backoff{
286-
Duration: 1 * time.Second,
287-
Factor: 2.0,
288-
Steps: 5,
289-
}
290-
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
291-
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
292-
if err != nil {
293-
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
294-
return false, nil
295-
}
296-
nodeObj = node
297-
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
298-
return true, nil
299-
})
300-
301-
if err != nil {
302-
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
303-
}
304-
return nodeObj, err
305-
}
306-
307271
func FetchRaidedLssdCountForDatacache() (int, error) {
308272
raidedPath, err := fetchRAIDedLocalSsdPath()
309273
if err != nil {

pkg/gce-pd-csi-driver/gce-pd-driver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi
158158
EnableDataCache: args.EnableDataCache,
159159
DataCacheEnabledNodePool: args.DataCacheEnabledNodePool,
160160
SysfsPath: args.SysfsPath,
161+
DeviceCache: args.DeviceCache,
161162
}
162163
}
163164

@@ -183,7 +184,7 @@ func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelT
183184
maxLogChar = grpcLogCharCap
184185

185186
klog.V(4).Infof("Driver: %v", gceDriver.name)
186-
//Start the nonblocking GRPC
187+
// Start the nonblocking GRPC
187188
s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager)
188189
// TODO(#34): Only start specific servers based on a flag.
189190
// In the future have this only run specific combinations of servers depending on which version this is.

pkg/gce-pd-csi-driver/node.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ import (
3232

3333
csi "github.com/container-storage-interface/spec/lib/go/csi"
3434

35-
"k8s.io/client-go/kubernetes"
36-
"k8s.io/client-go/rest"
3735
"k8s.io/klog/v2"
3836
"k8s.io/mount-utils"
3937

4038
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
4139
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
4240
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
41+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
42+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache"
4343
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
4444
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/resizefs"
4545
)
@@ -77,6 +77,9 @@ type GCENodeServer struct {
7777
// Embed UnimplementedNodeServer to ensure the driver returns Unimplemented for any
7878
// new RPC methods that might be introduced in future versions of the spec.
7979
csi.UnimplementedNodeServer
80+
81+
// A cache of the device paths for the volumes that are attached to the node.
82+
DeviceCache *linkcache.DeviceCache
8083
}
8184

8285
type NodeServerArgs struct {
@@ -92,6 +95,8 @@ type NodeServerArgs struct {
9295

9396
// SysfsPath defaults to "/sys", except if it's a unit test.
9497
SysfsPath string
98+
99+
DeviceCache *linkcache.DeviceCache
95100
}
96101

97102
var _ csi.NodeServer = &GCENodeServer{}
@@ -501,6 +506,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
501506
}
502507
}
503508

509+
err = ns.DeviceCache.AddVolume(volumeID)
510+
if err != nil {
511+
klog.Warningf("Error adding volume %s to cache: %v", volumeID, err)
512+
}
513+
504514
klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath)
505515
return &csi.NodeStageVolumeResponse{}, nil
506516
}
@@ -614,6 +624,9 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns
614624
return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err)
615625
}
616626
}
627+
628+
ns.DeviceCache.RemoveVolume(volumeID)
629+
617630
klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath)
618631
return &csi.NodeUnstageVolumeResponse{}, nil
619632
}
@@ -869,15 +882,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) {
869882
}
870883

871884
func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) {
872-
cfg, err := rest.InClusterConfig()
873-
if err != nil {
874-
return 0, err
875-
}
876-
kubeClient, err := kubernetes.NewForConfig(cfg)
877-
if err != nil {
878-
return 0, err
879-
}
880-
node, err := getNodeWithRetry(ctx, kubeClient, nodeName)
885+
node, err := k8sclient.GetNodeWithRetry(ctx, nodeName)
881886
if err != nil {
882887
return 0, err
883888
}

pkg/k8sclient/node.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package k8sclient
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
v1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/util/wait"
10+
"k8s.io/client-go/kubernetes"
11+
"k8s.io/client-go/rest"
12+
"k8s.io/klog/v2"
13+
)
14+
15+
func GetNodeWithRetry(ctx context.Context, nodeName string) (*v1.Node, error) {
16+
cfg, err := rest.InClusterConfig()
17+
if err != nil {
18+
return nil, err
19+
}
20+
kubeClient, err := kubernetes.NewForConfig(cfg)
21+
if err != nil {
22+
return nil, err
23+
}
24+
return getNodeWithRetry(ctx, kubeClient, nodeName)
25+
}
26+
27+
func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
28+
var nodeObj *v1.Node
29+
backoff := wait.Backoff{
30+
Duration: 1 * time.Second,
31+
Factor: 2.0,
32+
Steps: 5,
33+
}
34+
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) {
35+
node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
36+
if err != nil {
37+
klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err)
38+
return false, nil
39+
}
40+
nodeObj = node
41+
klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName)
42+
return true, nil
43+
})
44+
45+
if err != nil {
46+
klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err)
47+
}
48+
return nodeObj, err
49+
}

0 commit comments

Comments
 (0)