Skip to content

Derive access mode from the CreateVolume request instead of parameters #1945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
// Label that is set on a disk when it is used by a 'multi-zone' VolumeHandle
MultiZoneLabel = "goog-gke-multi-zone"

// GCE Access Modes that are valid for hyperdisks only.
GCEReadOnlyManyAccessMode = "READ_ONLY_MANY"
GCEReadWriteManyAccessMode = "READ_WRITE_MANY"
GCEReadWriteOnceAccessMode = "READ_WRITE_SINGLE"

// Data cache mode
DataCacheModeWriteBack = "writeback"
DataCacheModeWriteThrough = "writethrough"
Expand Down
12 changes: 8 additions & 4 deletions pkg/common/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const (
ParameterKeyDataCacheMode = "data-cache-mode"
ParameterKeyResourceTags = "resource-tags"
ParameterKeyEnableMultiZoneProvisioning = "enable-multi-zone-provisioning"
ParameterHdHADiskType = "hyperdisk-balanced-high-availability"

// Parameters for VolumeSnapshotClass
ParameterKeyStorageLocations = "storage-locations"
Expand Down Expand Up @@ -76,6 +75,11 @@ const (
tagKeyCreatedForSnapshotName = "kubernetes.io/created-for/volumesnapshot/name"
tagKeyCreatedForSnapshotNamespace = "kubernetes.io/created-for/volumesnapshot/namespace"
tagKeyCreatedForSnapshotContentName = "kubernetes.io/created-for/volumesnapshotcontent/name"

// Hyperdisk disk types
DiskTypeHdHA = "hyperdisk-balanced-high-availability"
DiskTypeHdT = "hyperdisk-throughput"
DiskTypeHdE = "hyperdisk-extreme"
)

type DataCacheParameters struct {
Expand Down Expand Up @@ -130,7 +134,7 @@ type DiskParameters struct {
}

func (dp *DiskParameters) IsRegional() bool {
return dp.ReplicationType == "regional-pd" || dp.DiskType == ParameterHdHADiskType
return dp.ReplicationType == "regional-pd" || dp.DiskType == DiskTypeHdHA
}

// SnapshotParameters contains normalized and defaulted parameters for snapshots
Expand Down Expand Up @@ -200,8 +204,8 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string]
case ParameterKeyType:
if v != "" {
p.DiskType = strings.ToLower(v)
if !pp.EnableHdHA && p.DiskType == ParameterHdHADiskType {
return p, d, fmt.Errorf("parameters contain invalid disk type %s", ParameterHdHADiskType)
if !pp.EnableHdHA && p.DiskType == DiskTypeHdHA {
return p, d, fmt.Errorf("parameters contain invalid disk type %s", DiskTypeHdHA)
}
}
case ParameterKeyReplicationType:
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,10 @@ func NewLimiter(limit, burst int, emptyBucket bool) *rate.Limiter {
return limiter
}

func IsHyperdisk(diskType string) bool {
return strings.HasPrefix(diskType, "hyperdisk-")
}

// shortString is inspired by k8s.io/apimachinery/pkg/util/rand.SafeEncodeString, but takes data from a hash.
func ShortString(s string) string {
hasher := fnv.New128a()
Expand Down
66 changes: 18 additions & 48 deletions pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
csi "github.com/container-storage-interface/spec/lib/go/csi"
computebeta "google.golang.org/api/compute/v0.beta"
computev1 "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -196,55 +197,30 @@ func (cloud *FakeCloudProvider) GetDisk(ctx context.Context, project string, vol
return disk, nil
}

func (cloud *FakeCloudProvider) ValidateExistingDisk(ctx context.Context, resp *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error {
if resp == nil {
return fmt.Errorf("disk does not exist")
}
requestValid := common.GbToBytes(resp.GetSizeGb()) >= reqBytes || reqBytes == 0
responseValid := common.GbToBytes(resp.GetSizeGb()) <= limBytes || limBytes == 0
if !requestValid || !responseValid {
return fmt.Errorf(
"disk already exists with incompatible capacity. Need %v (Required) < %v (Existing) < %v (Limit)",
reqBytes, common.GbToBytes(resp.GetSizeGb()), limBytes)
}

respType := strings.Split(resp.GetPDType(), "/")
typeMatch := strings.TrimSpace(respType[len(respType)-1]) == strings.TrimSpace(params.DiskType)
typeDefault := params.DiskType == "" && strings.TrimSpace(respType[len(respType)-1]) == "pd-standard"
if !typeMatch && !typeDefault {
return fmt.Errorf("disk already exists with incompatible type. Need %v. Got %v",
params.DiskType, respType[len(respType)-1])
}

// We are assuming here that a multiWriter disk could be used as non-multiWriter
if multiWriter && !resp.GetMultiWriter() {
return fmt.Errorf("disk already exists with incompatible capability. Need MultiWriter. Got non-MultiWriter")
}
klog.V(4).Infof("Compatible disk already exists")
return ValidateDiskParameters(resp, params)
}

func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error {
if disk, ok := cloud.disks[volKey.String()]; ok {
err := cloud.ValidateExistingDisk(ctx, disk, params,
err := ValidateExistingDisk(ctx, disk, params,
int64(capacityRange.GetRequiredBytes()),
int64(capacityRange.GetLimitBytes()),
multiWriter)
multiWriter, accessMode)
if err != nil {
return err
}
}

computeDisk := &computev1.Disk{
Name: volKey.Name,
SizeGb: common.BytesToGbRoundUp(capBytes),
Description: "Disk created by GCE-PD CSI Driver",
Type: cloud.GetDiskTypeURI(project, volKey, params.DiskType),
SourceDiskId: volumeContentSourceVolumeID,
Status: cloud.mockDiskStatus,
Labels: params.Labels,
ProvisionedIops: params.ProvisionedIOPSOnCreate,
ProvisionedThroughput: params.ProvisionedThroughputOnCreate,
computeDisk := &computebeta.Disk{
Name: volKey.Name,
SizeGb: common.BytesToGbRoundUp(capBytes),
Description: "Disk created by GCE-PD CSI Driver",
Type: cloud.GetDiskTypeURI(project, volKey, params.DiskType),
SourceDiskId: volumeContentSourceVolumeID,
Status: cloud.mockDiskStatus,
Labels: params.Labels,
ProvisionedIops: params.ProvisionedIOPSOnCreate,
ProvisionedThroughput: params.ProvisionedThroughputOnCreate,
AccessMode: accessMode,
MultiWriter: multiWriter,
EnableConfidentialCompute: params.EnableConfidentialCompute,
}

if snapshotID != "" {
Expand All @@ -263,7 +239,7 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string,
}

if params.DiskEncryptionKMSKey != "" {
computeDisk.DiskEncryptionKey = &computev1.CustomerEncryptionKey{
computeDisk.DiskEncryptionKey = &computebeta.CustomerEncryptionKey{
KmsKeyName: params.DiskEncryptionKMSKey,
}
}
Expand All @@ -278,13 +254,7 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string,
return fmt.Errorf("could not create disk, key was neither zonal nor regional, instead got: %v", volKey.String())
}

if containsBetaDiskType(hyperdiskTypes, params.DiskType) {
betaDisk := convertV1DiskToBetaDisk(computeDisk)
betaDisk.EnableConfidentialCompute = params.EnableConfidentialCompute
cloud.disks[volKey.String()] = CloudDiskFromBeta(betaDisk)
} else {
cloud.disks[volKey.String()] = CloudDiskFromV1(computeDisk)
}
cloud.disks[volKey.String()] = CloudDiskFromBeta(computeDisk)
return nil
}

Expand Down
70 changes: 43 additions & 27 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const (
pdDiskTypeUnsupportedPattern = `\[([a-z-]+)\] features are not compatible for creating instance`
)

var hyperdiskTypes = []string{"hyperdisk-extreme", "hyperdisk-throughput", "hyperdisk-balanced"}
var pdDiskTypeUnsupportedRegex = regexp.MustCompile(pdDiskTypeUnsupportedPattern)

type GCEAPIVersion string
Expand Down Expand Up @@ -101,7 +100,6 @@ type GCECompute interface {
// Disk Methods
GetDisk(ctx context.Context, project string, volumeKey *meta.Key) (*CloudDisk, error)
RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error)
ValidateExistingDisk(ctx context.Context, disk *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error
DeleteDisk(ctx context.Context, project string, volumeKey *meta.Key) error
UpdateDisk(ctx context.Context, project string, volKey *meta.Key, existingDisk *CloudDisk, params common.ModifyVolumeParameters) error
Expand Down Expand Up @@ -382,7 +380,7 @@ func (cloud *CloudProvider) getRegionURI(project, region string) string {
region)
}

func (cloud *CloudProvider) ValidateExistingDisk(ctx context.Context, resp *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error {
func ValidateExistingDisk(ctx context.Context, resp *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool, accessMode string) error {
klog.V(5).Infof("Validating existing disk %v with diskType: %s, reqested bytes: %v, limit bytes: %v", resp, params.DiskType, reqBytes, limBytes)
if resp == nil {
return fmt.Errorf("disk does not exist")
Expand All @@ -395,14 +393,31 @@ func (cloud *CloudProvider) ValidateExistingDisk(ctx context.Context, resp *Clou
reqBytes, common.GbToBytes(resp.GetSizeGb()), limBytes)
}

// We are assuming here that a multiWriter disk could be used as non-multiWriter
if multiWriter && !resp.GetMultiWriter() {
if common.IsHyperdisk(params.DiskType) {
if !validAccessMode(accessMode, resp.GetAccessMode()) {
return fmt.Errorf("disk already exists with incompatible capability. Need %s. Got %s", accessMode, resp.GetAccessMode())
}
} else if multiWriter && !resp.GetMultiWriter() {
// We are assuming here that a multiWriter PD could be used as non-multiWriter
return fmt.Errorf("disk already exists with incompatible capability. Need MultiWriter. Got non-MultiWriter")
}

return ValidateDiskParameters(resp, params)
}

func validAccessMode(want, got string) bool {
if want == got {
return true
}
switch want {
case common.GCEReadOnlyManyAccessMode, common.GCEReadWriteOnceAccessMode:
return got == common.GCEReadWriteManyAccessMode
// For RWX, no other access mode is valid.
default:
return false
}
}

// ValidateDiskParameters takes a CloudDisk and returns true if the parameters
// specified validly describe the disk provided, and false otherwise.
func ValidateDiskParameters(disk *CloudDisk, params common.DiskParameters) error {
Expand Down Expand Up @@ -442,7 +457,7 @@ func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volK
if description == "" {
description = "Regional disk created by GCE-PD CSI Driver"
}
return cloud.insertRegionalDisk(ctx, project, volKey, params, capBytes, capacityRange, replicaZones, snapshotID, volumeContentSourceVolumeID, description, multiWriter)
return cloud.insertRegionalDisk(ctx, project, volKey, params, capBytes, capacityRange, replicaZones, snapshotID, volumeContentSourceVolumeID, description, multiWriter, accessMode)
default:
return fmt.Errorf("could not insert disk, key was neither zonal nor regional, instead got: %v", volKey.String())
}
Expand Down Expand Up @@ -626,7 +641,8 @@ func (cloud *CloudProvider) insertRegionalDisk(
snapshotID string,
volumeContentSourceVolumeID string,
description string,
multiWriter bool) error {
multiWriter bool,
accessMode string) error {
var (
err error
opName string
Expand Down Expand Up @@ -676,8 +692,13 @@ func (cloud *CloudProvider) insertRegionalDisk(
}
}

if common.IsHyperdisk(params.DiskType) {
diskToCreate.AccessMode = accessMode
} else {
diskToCreate.MultiWriter = multiWriter
}

var insertOp *computebeta.Operation
diskToCreate.MultiWriter = multiWriter
insertOp, err = cloud.betaService.RegionDisks.Insert(project, volKey.Region, diskToCreate).Context(ctx).Do()
if insertOp != nil {
opName = insertOp.Name
Expand All @@ -691,10 +712,10 @@ func (cloud *CloudProvider) insertRegionalDisk(
// the error code should be non-Final
return common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error when getting disk: %w", err))
}
err = cloud.ValidateExistingDisk(ctx, disk, params,
err = ValidateExistingDisk(ctx, disk, params,
int64(capacityRange.GetRequiredBytes()),
int64(capacityRange.GetLimitBytes()),
multiWriter)
multiWriter, accessMode)
if err != nil {
return err
}
Expand All @@ -715,10 +736,10 @@ func (cloud *CloudProvider) insertRegionalDisk(
if err != nil {
return common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error when getting disk: %w", err))
}
err = cloud.ValidateExistingDisk(ctx, disk, params,
err = ValidateExistingDisk(ctx, disk, params,
int64(capacityRange.GetRequiredBytes()),
int64(capacityRange.GetLimitBytes()),
multiWriter)
multiWriter, accessMode)
if err != nil {
return err
}
Expand Down Expand Up @@ -806,7 +827,12 @@ func (cloud *CloudProvider) insertZonalDisk(
}
}

diskToCreate.AccessMode = accessMode
if common.IsHyperdisk(params.DiskType) {
diskToCreate.AccessMode = accessMode
} else {
diskToCreate.MultiWriter = multiWriter
}

var insertOp *computebeta.Operation
insertOp, err = cloud.betaService.Disks.Insert(project, volKey.Zone, diskToCreate).Context(ctx).Do()
if insertOp != nil {
Expand All @@ -821,10 +847,10 @@ func (cloud *CloudProvider) insertZonalDisk(
// the error code should be non-Final
return common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error when getting disk: %w", err))
}
err = cloud.ValidateExistingDisk(ctx, disk, params,
err = ValidateExistingDisk(ctx, disk, params,
int64(capacityRange.GetRequiredBytes()),
int64(capacityRange.GetLimitBytes()),
multiWriter)
multiWriter, accessMode)
if err != nil {
return err
}
Expand All @@ -846,10 +872,10 @@ func (cloud *CloudProvider) insertZonalDisk(
if err != nil {
return common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error when getting disk: %w", err))
}
err = cloud.ValidateExistingDisk(ctx, disk, params,
err = ValidateExistingDisk(ctx, disk, params,
int64(capacityRange.GetRequiredBytes()),
int64(capacityRange.GetLimitBytes()),
multiWriter)
multiWriter, accessMode)
if err != nil {
return err
}
Expand Down Expand Up @@ -1687,13 +1713,3 @@ func encodeTags(tags map[string]string) (string, error) {
}
return string(enc), nil
}

func containsBetaDiskType(betaDiskTypes []string, diskType string) bool {
for _, betaDiskType := range betaDiskTypes {
if betaDiskType == diskType {
return true
}
}

return false
}
Loading