Skip to content

Commit 0ad159a

Browse files
committed
[WIP] GKE-MT support for PDCSI
1 parent 624f654 commit 0ad159a

File tree

447 files changed

+36849
-35
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

447 files changed

+36849
-35
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ var (
7676
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
7777
enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks")
7878
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration")
79+
enableMultitenancyFlag = flag.Bool("enable-multitenancy", false, "If set to true, the CSI Driver will support running on multitenant GKE clusters")
7980
nodeName = flag.String("node-name", "", "The node this driver is running on")
8081

8182
multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
@@ -221,10 +222,15 @@ func handle() {
221222
// Initialize requirements for the controller service
222223
var controllerServer *driver.GCEControllerServer
223224
if *runControllerService {
224-
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig)
225+
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig, *enableMultitenancyFlag)
225226
if err != nil {
226227
klog.Fatalf("Failed to get cloud provider: %v", err.Error())
227228
}
229+
if *enableMultitenancyFlag {
230+
ctx, cancel := context.WithCancel(ctx)
231+
defer cancel()
232+
go cloudProvider.TenantInformer.Run(ctx.Done())
233+
}
228234
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
229235
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
230236
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag)

deploy/kubernetes/base/controller/controller.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ spec:
145145
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
146146
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
147147
- --enable-data-cache
148+
- --enable-multitenancy
148149
command:
149150
- /gce-pd-csi-driver
150151
env:

deploy/kubernetes/base/node_linux/node.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ spec:
4747
- "--endpoint=unix:/csi/csi.sock"
4848
- "--run-controller-service=false"
4949
- "--enable-data-cache"
50+
- "--enable-multitenancy"
5051
- "--node-name=$(KUBE_NODE_NAME)"
5152
securityContext:
5253
privileged: true

deploy/kubernetes/images/stable-master/image.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ metadata:
4949
name: imagetag-gcepd-driver
5050
imageTag:
5151
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
52-
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
53-
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
54-
newTag: "v1.17.2"
52+
# Don't change stable image without changing pdImagePlaceholder in
53+
# test/k8s-integration/main.go
54+
newName: us-central1-docker.pkg.dev/enginakdemir-gke-dev/csi-dev/gcp-compute-persistent-disk-csi-driver
55+
newTag: "latest"
5556
---

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ require (
4141
k8s.io/mount-utils v0.32.3
4242
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
4343
sigs.k8s.io/boskos v0.0.0-20220711194915-6cb8a6fb2dd1
44+
sigs.k8s.io/yaml v1.4.0
4445
)
4546

4647
require (
@@ -116,7 +117,6 @@ require (
116117
k8s.io/test-infra v0.0.0-20210730160938-8ad9b8c53bd8 // indirect
117118
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
118119
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect
119-
sigs.k8s.io/yaml v1.4.0 // indirect
120120
)
121121

122122
replace k8s.io/client-go => k8s.io/client-go v0.32.2

pkg/gce-cloud-provider/compute/fake-gce.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ func (cloud *FakeCloudProvider) InsertInstance(instance *computev1.Instance, ins
378378
return
379379
}
380380

381-
func (cloud *FakeCloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
381+
func (cloud *FakeCloudProvider) GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error) {
382382
instance, ok := cloud.instances[instanceName]
383383
if !ok {
384384
return nil, notFoundError()

pkg/gce-cloud-provider/compute/gce-compute.go

Lines changed: 99 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/googleapis/gax-go/v2/apierror"
3333
"golang.org/x/oauth2"
3434
computebeta "google.golang.org/api/compute/v0.beta"
35+
"google.golang.org/api/compute/v1"
3536
computev1 "google.golang.org/api/compute/v1"
3637
"google.golang.org/api/googleapi"
3738
"google.golang.org/api/iterator"
@@ -117,7 +118,7 @@ type GCECompute interface {
117118
// Regional Disk Methods
118119
GetReplicaZoneURI(project string, zone string) string
119120
// Instance Methods
120-
GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error)
121+
GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error)
121122
// Zone Methods
122123
ListZones(ctx context.Context, region string) ([]string, error)
123124
ListSnapshots(ctx context.Context, filter string) ([]*computev1.Snapshot, string, error)
@@ -160,40 +161,75 @@ func (cloud *CloudProvider) listDisksInternal(ctx context.Context, fields []goog
160161
if err != nil {
161162
return nil, "", err
162163
}
163-
items := []*computev1.Disk{}
164+
disks := []*computev1.Disk{}
164165

165166
// listing out regional disks in the region
166-
rlCall := cloud.service.RegionDisks.List(cloud.project, region)
167+
rDisks, err := listRegionalDisksForProject(cloud.service, cloud.project, region, fields, filter)
168+
if err != nil {
169+
return nil, "", err
170+
}
171+
disks = append(disks, rDisks...)
172+
for tProject, tService := range cloud.tenantServiceMap {
173+
rDisks, err := listRegionalDisksForProject(tService, tProject, region, fields, filter)
174+
if err != nil {
175+
return nil, "", err
176+
}
177+
disks = append(disks, rDisks...)
178+
}
179+
180+
// listing out zonal disks in all zones of the region
181+
zDisks, err := listZonalDisksForProject(cloud.service, cloud.project, zones, fields, filter)
182+
if err != nil {
183+
return nil, "", err
184+
}
185+
disks = append(disks, zDisks...)
186+
for tProject, tService := range cloud.tenantServiceMap {
187+
zDisks, err := listZonalDisksForProject(tService, tProject, zones, fields, filter)
188+
if err != nil {
189+
return nil, "", err
190+
}
191+
disks = append(disks, zDisks...)
192+
}
193+
194+
return disks, "", nil
195+
}
196+
197+
func listRegionalDisksForProject(service *computev1.Service, project string, region string, fields []googleapi.Field, filter string) ([]*computev1.Disk, error) {
198+
items := []*computev1.Disk{}
199+
rlCall := service.RegionDisks.List(project, region)
167200
rlCall.Fields(fields...)
168201
rlCall.Filter(filter)
169202
nextPageToken := "pageToken"
170203
for nextPageToken != "" {
171204
rDiskList, err := rlCall.Do()
172205
if err != nil {
173-
return nil, "", err
206+
return nil, err
174207
}
175208
items = append(items, rDiskList.Items...)
176209
nextPageToken = rDiskList.NextPageToken
177210
rlCall.PageToken(nextPageToken)
178211
}
212+
return items, nil
213+
}
179214

180-
// listing out zonal disks in all zones of the region
215+
func listZonalDisksForProject(service *computev1.Service, project string, zones []string, fields []googleapi.Field, filter string) ([]*computev1.Disk, error) {
216+
items := []*computev1.Disk{}
181217
for _, zone := range zones {
182-
lCall := cloud.service.Disks.List(cloud.project, zone)
218+
lCall := service.Disks.List(project, zone)
183219
lCall.Fields(fields...)
184220
lCall.Filter(filter)
185221
nextPageToken := "pageToken"
186222
for nextPageToken != "" {
187223
diskList, err := lCall.Do()
188224
if err != nil {
189-
return nil, "", err
225+
return nil, err
190226
}
191227
items = append(items, diskList.Items...)
192228
nextPageToken = diskList.NextPageToken
193229
lCall.PageToken(nextPageToken)
194230
}
195231
}
196-
return items, "", nil
232+
return items, nil
197233
}
198234

199235
// ListInstances lists instances based on maxEntries and pageToken for the project and region
@@ -210,8 +246,27 @@ func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleap
210246
}
211247
items := []*computev1.Instance{}
212248

249+
instances, err := cloud.listInstancesForProject(cloud.service, cloud.project, zones, fields)
250+
if err != nil {
251+
return nil, "", err
252+
}
253+
items = append(items, instances...)
254+
for tProject, tService := range cloud.tenantServiceMap {
255+
instances, err := cloud.listInstancesForProject(tService, tProject, zones, fields)
256+
if err != nil {
257+
return nil, "", err
258+
}
259+
items = append(items, instances...)
260+
}
261+
262+
return items, "", nil
263+
}
264+
265+
func (cloud *CloudProvider) listInstancesForProject(service *computev1.Service, project string, zones []string, fields []googleapi.Field) ([]*computev1.Instance, error) {
266+
items := []*computev1.Instance{}
267+
213268
for _, zone := range zones {
214-
lCall := cloud.service.Instances.List(cloud.project, zone)
269+
lCall := service.Instances.List(project, zone)
215270
for _, filter := range cloud.listInstancesConfig.Filters {
216271
lCall = lCall.Filter(filter)
217272
}
@@ -220,15 +275,14 @@ func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleap
220275
for nextPageToken != "" {
221276
instancesList, err := lCall.Do()
222277
if err != nil {
223-
return nil, "", err
278+
return nil, err
224279
}
225280
items = append(items, instancesList.Items...)
226281
nextPageToken = instancesList.NextPageToken
227282
lCall.PageToken(nextPageToken)
228283
}
229284
}
230-
231-
return items, "", nil
285+
return items, nil
232286
}
233287

234288
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
@@ -857,7 +911,11 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, project string, volK
857911
ForceAttach: forceAttach,
858912
}
859913

860-
op, err := cloud.service.Instances.AttachDisk(project, instanceZone, instanceName, attachedDiskV1).Context(ctx).ForceAttach(forceAttach).Do()
914+
service := cloud.service
915+
if _, ok := cloud.tenantServiceMap[project]; ok {
916+
service = cloud.tenantServiceMap[project]
917+
}
918+
op, err := service.Instances.AttachDisk(project, instanceZone, instanceName, attachedDiskV1).Context(ctx).ForceAttach(forceAttach).Do()
861919
if err != nil {
862920
return fmt.Errorf("failed cloud service attach disk call: %w", err)
863921
}
@@ -872,7 +930,11 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, project string, volK
872930

873931
func (cloud *CloudProvider) DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error {
874932
klog.V(5).Infof("Detaching disk %v from %v", deviceName, instanceName)
875-
op, err := cloud.service.Instances.DetachDisk(project, instanceZone, instanceName, deviceName).Context(ctx).Do()
933+
service := cloud.service
934+
if _, ok := cloud.tenantServiceMap[project]; ok {
935+
service = cloud.tenantServiceMap[project]
936+
}
937+
op, err := service.Instances.DetachDisk(project, instanceZone, instanceName, deviceName).Context(ctx).Do()
876938
if err != nil {
877939
return err
878940
}
@@ -1041,7 +1103,7 @@ func (cloud *CloudProvider) waitForAttachOnInstance(ctx context.Context, project
10411103
start := time.Now()
10421104
return wait.ExponentialBackoff(AttachDiskBackoff, func() (bool, error) {
10431105
klog.V(6).Infof("Polling instances.get for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
1044-
instance, err := cloud.GetInstanceOrError(ctx, instanceZone, instanceName)
1106+
instance, err := cloud.GetInstanceOrError(ctx, project, instanceZone, instanceName)
10451107
if err != nil {
10461108
return false, fmt.Errorf("GetInstance failed to get instance: %w", err)
10471109
}
@@ -1145,10 +1207,13 @@ func opIsDone(op *computev1.Operation) (bool, error) {
11451207
return true, nil
11461208
}
11471209

1148-
func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
1210+
func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error) {
11491211
klog.V(5).Infof("Getting instance %v from zone %v", instanceName, instanceZone)
1150-
project := cloud.project
1151-
instance, err := cloud.service.Instances.Get(project, instanceZone, instanceName).Do()
1212+
service := cloud.service
1213+
if _, ok := cloud.tenantServiceMap[project]; ok {
1214+
service = cloud.tenantServiceMap[project]
1215+
}
1216+
instance, err := service.Instances.Get(project, instanceZone, instanceName).Do()
11521217
if err != nil {
11531218
return nil, err
11541219
}
@@ -1426,6 +1491,22 @@ func (cloud *CloudProvider) waitForSnapshotCreation(ctx context.Context, project
14261491
}
14271492
}
14281493

1494+
func (cloud *CloudProvider) getTenantService(ctx context.Context, project string) (*compute.Service, error) {
1495+
s, exists := cloud.tenantServiceMap[project]
1496+
if !exists {
1497+
var err error
1498+
s, err = cloud.createTenantService(project)
1499+
if err != nil {
1500+
return nil, fmt.Errorf("error during tenant compute client creation: %w", err)
1501+
}
1502+
}
1503+
return s, nil
1504+
}
1505+
1506+
func (cloud *CloudProvider) createTenantService(project string) (*compute.Service, error) {
1507+
return nil, nil
1508+
}
1509+
14291510
// getResourceManagerTags returns the map of tag keys and values. The tag keys are in the form `tagKeys/{tag_key_id}`
14301511
// and the tag values are in the format `tagValues/456`.
14311512
func getResourceManagerTags(ctx context.Context, tokenSource oauth2.TokenSource, tagsMap map[string]string) (map[string]string, error) {

0 commit comments

Comments
 (0)