Skip to content

GKE-MT Support for PDCSI #2081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
9 changes: 8 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks")
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration")
enableMultitenancyFlag = flag.Bool("enable-multitenancy", false, "If set to true, the CSI Driver will support running on multitenant GKE clusters")
nodeName = flag.String("node-name", "", "The node this driver is running on")

multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
Expand Down Expand Up @@ -232,11 +233,17 @@ func handle() {
// Initialize requirements for the controller service
var controllerServer *driver.GCEControllerServer
if *runControllerService {
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig)
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig, *enableMultitenancyFlag)
if err != nil {
klog.Fatalf("Failed to get cloud provider: %v", err.Error())
}

if *enableMultitenancyFlag {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go cloudProvider.TenantInformer.Run(ctx.Done())
}

initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
// TODO(2042): Move more of the constructor args into this struct
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/base/controller/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ spec:
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
- --enable-data-cache
- --enable-multitenancy
command:
- /gce-pd-csi-driver
env:
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/base/node_linux/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ spec:
- "--endpoint=unix:/csi/csi.sock"
- "--run-controller-service=false"
- "--enable-data-cache"
- "--enable-multitenancy"
- "--node-name=$(KUBE_NODE_NAME)"
securityContext:
privileged: true
Expand Down
7 changes: 4 additions & 3 deletions deploy/kubernetes/images/stable-master/image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ metadata:
name: imagetag-gcepd-driver
imageTag:
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
newTag: "v1.17.2"
# Don't change stable image without changing pdImagePlaceholder in
# test/k8s-integration/main.go
newName: us-central1-docker.pkg.dev/enginakdemir-gke-dev/csi-dev/gcp-compute-persistent-disk-csi-driver
newTag: "latest"
---

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
k8s.io/mount-utils v0.33.1
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
sigs.k8s.io/boskos v0.0.0-20220711194915-6cb8a6fb2dd1
sigs.k8s.io/yaml v1.4.0
)

require (
Expand Down Expand Up @@ -115,7 +116,6 @@ require (
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace k8s.io/client-go => k8s.io/client-go v0.32.2
2 changes: 1 addition & 1 deletion pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (cloud *FakeCloudProvider) InsertInstance(instance *computev1.Instance, ins
return
}

func (cloud *FakeCloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
func (cloud *FakeCloudProvider) GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error) {
instance, ok := cloud.instances[instanceName]
if !ok {
return nil, notFoundError()
Expand Down
106 changes: 87 additions & 19 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type GCECompute interface {
// Regional Disk Methods
GetReplicaZoneURI(project string, zone string) string
// Instance Methods
GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error)
GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error)
// Zone Methods
ListZones(ctx context.Context, region string) ([]string, error)
ListSnapshots(ctx context.Context, filter string) ([]*computev1.Snapshot, string, error)
Expand Down Expand Up @@ -160,40 +160,79 @@ func (cloud *CloudProvider) listDisksInternal(ctx context.Context, fields []goog
if err != nil {
return nil, "", err
}
items := []*computev1.Disk{}
disks := []*computev1.Disk{}

klog.Infof("Getting regional disks for project: %s", cloud.project)
rDisks, err := listRegionalDisksForProject(cloud.service, cloud.project, region, fields, filter)
if err != nil {
return nil, "", err
}
disks = append(disks, rDisks...)
// listing out regional disks in the region for each tenant project
for p, s := range cloud.tenantServiceMap {
klog.Infof("Getting regional disks for tenant project: %s", p)
rDisks, err := listRegionalDisksForProject(s, p, region, fields, filter)
if err != nil {
return nil, "", err
}
disks = append(disks, rDisks...)
}

klog.Infof("Getting zonal disks for project: %s", cloud.project)
zDisks, err := listZonalDisksForProject(cloud.service, cloud.project, zones, fields, filter)
if err != nil {
return nil, "", err
}
disks = append(disks, zDisks...)
// listing out zonal disks in all zones of the region for each tenant project
for p, s := range cloud.tenantServiceMap {
klog.Infof("Getting zonal disks for tenant project: %s", p)
zDisks, err := listZonalDisksForProject(s, p, zones, fields, filter)
if err != nil {
return nil, "", err
}
disks = append(disks, zDisks...)
}

// listing out regional disks in the region
rlCall := cloud.service.RegionDisks.List(cloud.project, region)
return disks, "", nil
}

func listRegionalDisksForProject(service *computev1.Service, project string, region string, fields []googleapi.Field, filter string) ([]*computev1.Disk, error) {
items := []*computev1.Disk{}
rlCall := service.RegionDisks.List(project, region)
rlCall.Fields(fields...)
rlCall.Filter(filter)
nextPageToken := "pageToken"
for nextPageToken != "" {
rDiskList, err := rlCall.Do()
if err != nil {
return nil, "", err
return nil, err
}
items = append(items, rDiskList.Items...)
nextPageToken = rDiskList.NextPageToken
rlCall.PageToken(nextPageToken)
}
return items, nil
}

// listing out zonal disks in all zones of the region
func listZonalDisksForProject(service *computev1.Service, project string, zones []string, fields []googleapi.Field, filter string) ([]*computev1.Disk, error) {
items := []*computev1.Disk{}
for _, zone := range zones {
lCall := cloud.service.Disks.List(cloud.project, zone)
lCall := service.Disks.List(project, zone)
lCall.Fields(fields...)
lCall.Filter(filter)
nextPageToken := "pageToken"
for nextPageToken != "" {
diskList, err := lCall.Do()
if err != nil {
return nil, "", err
return nil, err
}
items = append(items, diskList.Items...)
nextPageToken = diskList.NextPageToken
lCall.PageToken(nextPageToken)
}
}
return items, "", nil
return items, nil
}

// ListInstances lists instances based on maxEntries and pageToken for the project and region
Expand All @@ -209,9 +248,28 @@ func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleap
return nil, "", err
}
items := []*computev1.Instance{}
instances, err := cloud.listInstancesForProject(cloud.service, cloud.project, zones, fields)
if err != nil {
return nil, "", err
}
items = append(items, instances...)

for p, s := range cloud.tenantServiceMap {
instances, err := cloud.listInstancesForProject(s, p, zones, fields)
if err != nil {
return nil, "", err
}
items = append(items, instances...)
}

return items, "", nil
}

func (cloud *CloudProvider) listInstancesForProject(service *computev1.Service, project string, zones []string, fields []googleapi.Field) ([]*computev1.Instance, error) {
items := []*computev1.Instance{}

for _, zone := range zones {
lCall := cloud.service.Instances.List(cloud.project, zone)
lCall := service.Instances.List(project, zone)
for _, filter := range cloud.listInstancesConfig.Filters {
lCall = lCall.Filter(filter)
}
Expand All @@ -220,15 +278,14 @@ func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleap
for nextPageToken != "" {
instancesList, err := lCall.Do()
if err != nil {
return nil, "", err
return nil, err
}
items = append(items, instancesList.Items...)
nextPageToken = instancesList.NextPageToken
lCall.PageToken(nextPageToken)
}
}

return items, "", nil
return items, nil
}

// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
Expand Down Expand Up @@ -857,7 +914,11 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, project string, volK
ForceAttach: forceAttach,
}

op, err := cloud.service.Instances.AttachDisk(project, instanceZone, instanceName, attachedDiskV1).Context(ctx).ForceAttach(forceAttach).Do()
service := cloud.service
if _, ok := cloud.tenantServiceMap[project]; ok {
service = cloud.tenantServiceMap[project]
}
op, err := service.Instances.AttachDisk(project, instanceZone, instanceName, attachedDiskV1).Context(ctx).ForceAttach(forceAttach).Do()
if err != nil {
return fmt.Errorf("failed cloud service attach disk call: %w", err)
}
Expand All @@ -872,7 +933,11 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, project string, volK

func (cloud *CloudProvider) DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error {
klog.V(5).Infof("Detaching disk %v from %v", deviceName, instanceName)
op, err := cloud.service.Instances.DetachDisk(project, instanceZone, instanceName, deviceName).Context(ctx).Do()
service := cloud.service
if _, ok := cloud.tenantServiceMap[project]; ok {
service = cloud.tenantServiceMap[project]
}
op, err := service.Instances.DetachDisk(project, instanceZone, instanceName, deviceName).Context(ctx).Do()
if err != nil {
return err
}
Expand Down Expand Up @@ -1041,7 +1106,7 @@ func (cloud *CloudProvider) waitForAttachOnInstance(ctx context.Context, project
start := time.Now()
return wait.ExponentialBackoff(AttachDiskBackoff, func() (bool, error) {
klog.V(6).Infof("Polling instances.get for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
instance, err := cloud.GetInstanceOrError(ctx, instanceZone, instanceName)
instance, err := cloud.GetInstanceOrError(ctx, project, instanceZone, instanceName)
if err != nil {
return false, fmt.Errorf("GetInstance failed to get instance: %w", err)
}
Expand Down Expand Up @@ -1145,10 +1210,13 @@ func opIsDone(op *computev1.Operation) (bool, error) {
return true, nil
}

func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error) {
klog.V(5).Infof("Getting instance %v from zone %v", instanceName, instanceZone)
project := cloud.project
instance, err := cloud.service.Instances.Get(project, instanceZone, instanceName).Do()
service := cloud.service
if _, ok := cloud.tenantServiceMap[project]; ok {
service = cloud.tenantServiceMap[project]
}
instance, err := service.Instances.Get(project, instanceZone, instanceName).Do()
if err != nil {
return nil, err
}
Expand Down
Loading