-
Notifications
You must be signed in to change notification settings - Fork 159
Feature: Changes to support Hyperdisk Multi-Writer mode by moving to Beta APIs. #1919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
5789a07
c85f509
6d5f693
615b44f
c06aea7
16433ea
ca945cf
204a2da
81ce104
e63374b
e6761af
a06549f
4b4f3b8
c6aad0e
d4251d9
10375f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,7 +99,7 @@ type GCECompute interface { | |
GetDefaultProject() string | ||
GetDefaultZone() string | ||
// Disk Methods | ||
GetDisk(ctx context.Context, project string, volumeKey *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error) | ||
GetDisk(ctx context.Context, project string, volumeKey *meta.Key) (*CloudDisk, error) | ||
RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error) | ||
ValidateExistingDisk(ctx context.Context, disk *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error | ||
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error | ||
|
@@ -321,28 +321,16 @@ func (cloud *CloudProvider) ListSnapshots(ctx context.Context, filter string) ([ | |
return items, "", nil | ||
} | ||
|
||
func (cloud *CloudProvider) GetDisk(ctx context.Context, project string, key *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error) { | ||
func (cloud *CloudProvider) GetDisk(ctx context.Context, project string, key *meta.Key) (*CloudDisk, error) { | ||
klog.V(5).Infof("Getting disk %v", key) | ||
|
||
// Override GCEAPIVersion as hyperdisk is only available in beta and we cannot get the disk-type with get disk call. | ||
gceAPIVersion = GCEAPIVersionBeta | ||
switch key.Type() { | ||
case meta.Zonal: | ||
if gceAPIVersion == GCEAPIVersionBeta { | ||
disk, err := cloud.getZonalBetaDiskOrError(ctx, project, key.Zone, key.Name) | ||
return CloudDiskFromBeta(disk), err | ||
} else { | ||
disk, err := cloud.getZonalDiskOrError(ctx, project, key.Zone, key.Name) | ||
return CloudDiskFromV1(disk), err | ||
} | ||
disk, err := cloud.getZonalBetaDiskOrError(ctx, project, key.Zone, key.Name) | ||
return CloudDiskFromBeta(disk), err | ||
case meta.Regional: | ||
if gceAPIVersion == GCEAPIVersionBeta { | ||
disk, err := cloud.getRegionalBetaDiskOrError(ctx, project, key.Region, key.Name) | ||
return CloudDiskFromBeta(disk), err | ||
} else { | ||
disk, err := cloud.getRegionalDiskOrError(ctx, project, key.Region, key.Name) | ||
return CloudDiskFromV1(disk), err | ||
} | ||
disk, err := cloud.getRegionalBetaDiskOrError(ctx, project, key.Region, key.Name) | ||
return CloudDiskFromBeta(disk), err | ||
default: | ||
return nil, fmt.Errorf("key was neither zonal nor regional, got: %v", key.String()) | ||
} | ||
|
@@ -407,11 +395,6 @@ func (cloud *CloudProvider) ValidateExistingDisk(ctx context.Context, resp *Clou | |
reqBytes, common.GbToBytes(resp.GetSizeGb()), limBytes) | ||
} | ||
|
||
// We are assuming here that a multiWriter disk could be used as non-multiWriter | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto, we should keep the multiwriter check. Hmm, this won't verify that a hyperdisk is multiwriter, however, since we aren't passing in the access mode. It looks like it's just called from the one place in pk/gce-pd-csi-driver/controller.go, so should be easy to add accessMode to the method (this makes it parallel to the insert disk call. Part of me thinks we should push this detail of how multiwriter is declared down into cloud disk, but we can defer that for now, HdHA might clarify the question). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HdHA PR has already been merged by the way: #1915 |
||
if multiWriter && !resp.GetMultiWriter() { | ||
return fmt.Errorf("disk already exists with incompatible capability. Need MultiWriter. Got non-MultiWriter") | ||
} | ||
|
||
return ValidateDiskParameters(resp, params) | ||
} | ||
|
||
|
@@ -553,9 +536,6 @@ func convertV1DiskToBetaDisk(v1Disk *computev1.Disk) *computebeta.Disk { | |
AccessMode: v1Disk.AccessMode, | ||
} | ||
|
||
// Hyperdisk doesn't currently support multiWriter (https://cloud.google.com/compute/docs/disks/hyperdisks#limitations), | ||
// but if multiWriter + hyperdisk is supported in the future, we want the PDCSI driver to support this feature without | ||
// any additional code change. | ||
if v1Disk.ProvisionedIops > 0 { | ||
betaDisk.ProvisionedIops = v1Disk.ProvisionedIops | ||
} | ||
|
@@ -619,9 +599,6 @@ func convertBetaDiskToV1Disk(betaDisk *computebeta.Disk) *computev1.Disk { | |
AccessMode: betaDisk.AccessMode, | ||
} | ||
|
||
// Hyperdisk doesn't currently support multiWriter (https://cloud.google.com/compute/docs/disks/hyperdisks#limitations), | ||
// but if multiWriter + hyperdisk is supported in the future, we want the PDCSI driver to support this feature without | ||
// any additional code change. | ||
if betaDisk.ProvisionedIops > 0 { | ||
v1Disk.ProvisionedIops = betaDisk.ProvisionedIops | ||
} | ||
|
@@ -646,16 +623,11 @@ func (cloud *CloudProvider) insertRegionalDisk( | |
description string, | ||
multiWriter bool) error { | ||
var ( | ||
err error | ||
opName string | ||
gceAPIVersion = GCEAPIVersionV1 | ||
err error | ||
opName string | ||
) | ||
|
||
if multiWriter { | ||
gceAPIVersion = GCEAPIVersionBeta | ||
} | ||
|
||
diskToCreate := &computev1.Disk{ | ||
diskToCreate := &computebeta.Disk{ | ||
Name: volKey.Name, | ||
SizeGb: common.BytesToGbRoundUp(capBytes), | ||
Description: description, | ||
|
@@ -684,7 +656,7 @@ func (cloud *CloudProvider) insertRegionalDisk( | |
diskToCreate.ReplicaZones = replicaZones | ||
} | ||
if params.DiskEncryptionKMSKey != "" { | ||
diskToCreate.DiskEncryptionKey = &computev1.CustomerEncryptionKey{ | ||
diskToCreate.DiskEncryptionKey = &computebeta.CustomerEncryptionKey{ | ||
KmsKeyName: params.DiskEncryptionKMSKey, | ||
} | ||
} | ||
|
@@ -694,29 +666,21 @@ func (cloud *CloudProvider) insertRegionalDisk( | |
} | ||
|
||
if len(resourceTags) > 0 { | ||
diskToCreate.Params = &computev1.DiskParams{ | ||
diskToCreate.Params = &computebeta.DiskParams{ | ||
ResourceManagerTags: resourceTags, | ||
} | ||
} | ||
|
||
if gceAPIVersion == GCEAPIVersionBeta { | ||
var insertOp *computebeta.Operation | ||
betaDiskToCreate := convertV1DiskToBetaDisk(diskToCreate) | ||
betaDiskToCreate.MultiWriter = multiWriter | ||
insertOp, err = cloud.betaService.RegionDisks.Insert(project, volKey.Region, betaDiskToCreate).Context(ctx).Do() | ||
if insertOp != nil { | ||
opName = insertOp.Name | ||
} | ||
} else { | ||
var insertOp *computev1.Operation | ||
insertOp, err = cloud.service.RegionDisks.Insert(project, volKey.Region, diskToCreate).Context(ctx).Do() | ||
if insertOp != nil { | ||
opName = insertOp.Name | ||
} | ||
var insertOp *computebeta.Operation | ||
diskToCreate.MultiWriter = multiWriter | ||
insertOp, err = cloud.betaService.RegionDisks.Insert(project, volKey.Region, diskToCreate).Context(ctx).Do() | ||
if insertOp != nil { | ||
opName = insertOp.Name | ||
} | ||
|
||
if err != nil { | ||
if IsGCEError(err, "alreadyExists") { | ||
disk, err := cloud.GetDisk(ctx, project, volKey, gceAPIVersion) | ||
disk, err := cloud.GetDisk(ctx, project, volKey) | ||
if err != nil { | ||
// failed to GetDisk, however the Disk may already exist | ||
// the error code should be non-Final | ||
|
@@ -742,7 +706,7 @@ func (cloud *CloudProvider) insertRegionalDisk( | |
// the error code returned should be non-final | ||
if err != nil { | ||
if IsGCEError(err, "alreadyExists") { | ||
disk, err := cloud.GetDisk(ctx, project, volKey, gceAPIVersion) | ||
disk, err := cloud.GetDisk(ctx, project, volKey) | ||
if err != nil { | ||
return common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error when getting disk: %w", err)) | ||
} | ||
|
@@ -774,15 +738,11 @@ func (cloud *CloudProvider) insertZonalDisk( | |
multiWriter bool, | ||
accessMode string) error { | ||
var ( | ||
err error | ||
opName string | ||
gceAPIVersion = GCEAPIVersionV1 | ||
err error | ||
opName string | ||
) | ||
if multiWriter { | ||
gceAPIVersion = GCEAPIVersionBeta | ||
} | ||
|
||
diskToCreate := &computev1.Disk{ | ||
diskToCreate := &computebeta.Disk{ | ||
Name: volKey.Name, | ||
SizeGb: common.BytesToGbRoundUp(capBytes), | ||
Description: description, | ||
|
@@ -824,7 +784,7 @@ func (cloud *CloudProvider) insertZonalDisk( | |
} | ||
|
||
if params.DiskEncryptionKMSKey != "" { | ||
diskToCreate.DiskEncryptionKey = &computev1.CustomerEncryptionKey{ | ||
diskToCreate.DiskEncryptionKey = &computebeta.CustomerEncryptionKey{ | ||
KmsKeyName: params.DiskEncryptionKMSKey, | ||
} | ||
} | ||
|
@@ -836,31 +796,22 @@ func (cloud *CloudProvider) insertZonalDisk( | |
} | ||
|
||
if len(resourceTags) > 0 { | ||
diskToCreate.Params = &computev1.DiskParams{ | ||
diskToCreate.Params = &computebeta.DiskParams{ | ||
ResourceManagerTags: resourceTags, | ||
} | ||
} | ||
diskToCreate.AccessMode = accessMode | ||
|
||
if gceAPIVersion == GCEAPIVersionBeta { | ||
var insertOp *computebeta.Operation | ||
betaDiskToCreate := convertV1DiskToBetaDisk(diskToCreate) | ||
betaDiskToCreate.MultiWriter = multiWriter | ||
insertOp, err = cloud.betaService.Disks.Insert(project, volKey.Zone, betaDiskToCreate).Context(ctx).Do() | ||
if insertOp != nil { | ||
opName = insertOp.Name | ||
} | ||
} else { | ||
var insertOp *computev1.Operation | ||
insertOp, err = cloud.service.Disks.Insert(project, volKey.Zone, diskToCreate).Context(ctx).Do() | ||
if insertOp != nil { | ||
opName = insertOp.Name | ||
} | ||
diskToCreate.AccessMode = accessMode | ||
var insertOp *computebeta.Operation | ||
diskToCreate.MultiWriter = multiWriter | ||
insertOp, err = cloud.betaService.Disks.Insert(project, volKey.Zone, diskToCreate).Context(ctx).Do() | ||
if insertOp != nil { | ||
opName = insertOp.Name | ||
} | ||
|
||
if err != nil { | ||
if IsGCEError(err, "alreadyExists") { | ||
disk, err := cloud.GetDisk(ctx, project, volKey, gceAPIVersion) | ||
disk, err := cloud.GetDisk(ctx, project, volKey) | ||
if err != nil { | ||
// failed to GetDisk, however the Disk may already exist | ||
// the error code should be non-Final | ||
|
@@ -887,7 +838,7 @@ func (cloud *CloudProvider) insertZonalDisk( | |
// failed to wait for Op to finish, however, the Op possibly is still running as expected | ||
// the error code returned should be non-final | ||
if IsGCEError(err, "alreadyExists") { | ||
disk, err := cloud.GetDisk(ctx, project, volKey, gceAPIVersion) | ||
disk, err := cloud.GetDisk(ctx, project, volKey) | ||
if err != nil { | ||
return common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error when getting disk: %w", err)) | ||
} | ||
|
@@ -1186,7 +1137,7 @@ func (cloud *CloudProvider) waitForAttachOnDisk(ctx context.Context, project str | |
start := time.Now() | ||
return wait.ExponentialBackoff(AttachDiskBackoff, func() (bool, error) { | ||
klog.V(6).Infof("Polling disks.get for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start)) | ||
disk, err := cloud.GetDisk(ctx, project, volKey, GCEAPIVersionV1) | ||
disk, err := cloud.GetDisk(ctx, project, volKey) | ||
if err != nil { | ||
return false, fmt.Errorf("GetDisk failed to get disk: %w", err) | ||
} | ||
|
@@ -1436,7 +1387,7 @@ func (cloud *CloudProvider) DeleteImage(ctx context.Context, project, imageName | |
// k8s.io/apimachinery/quantity package for better size handling | ||
func (cloud *CloudProvider) ResizeDisk(ctx context.Context, project string, volKey *meta.Key, requestBytes int64) (int64, error) { | ||
klog.V(5).Infof("Resizing disk %v to size %v", volKey, requestBytes) | ||
cloudDisk, err := cloud.GetDisk(ctx, project, volKey, GCEAPIVersionV1) | ||
cloudDisk, err := cloud.GetDisk(ctx, project, volKey) | ||
if err != nil { | ||
return -1, fmt.Errorf("failed to get disk: %w", err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to keep this check, to validate that we aren't trying to use a non-multiwriter disk when we need a multiwriter one?