diff --git a/pkg/common/parameters.go b/pkg/common/parameters.go index 3b07f0779..2a1fa4d13 100644 --- a/pkg/common/parameters.go +++ b/pkg/common/parameters.go @@ -33,6 +33,7 @@ const ( ParameterKeyEnableConfidentialCompute = "enable-confidential-storage" ParameterKeyStoragePools = "storage-pools" ParameterKeyResourceTags = "resource-tags" + ParameterKeyEnableMultiZoneProvisioning = "enable-multi-zone-provisioning" // Parameters for VolumeSnapshotClass ParameterKeyStorageLocations = "storage-locations" @@ -102,6 +103,9 @@ type DiskParameters struct { // Values: {map[string]string} // Default: "" ResourceTags map[string]string + // Values: {bool} + // Default: false + MultiZoneProvisioning bool } // SnapshotParameters contains normalized and defaulted parameters for snapshots @@ -121,11 +125,17 @@ type StoragePool struct { ResourceName string } +type ParameterProcessor struct { + DriverName string + EnableStoragePools bool + EnableMultiZone bool +} + // ExtractAndDefaultParameters will take the relevant parameters from a map and // put them into a well defined struct making sure to default unspecified fields. // extraVolumeLabels are added as labels; if there are also labels specified in // parameters, any matching extraVolumeLabels will be overridden. -func ExtractAndDefaultParameters(parameters map[string]string, driverName string, extraVolumeLabels map[string]string, enableStoragePools bool, extraTags map[string]string) (DiskParameters, error) { +func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string]string, extraVolumeLabels map[string]string, extraTags map[string]string) (DiskParameters, error) { p := DiskParameters{ DiskType: "pd-standard", // Default ReplicationType: replicationTypeNone, // Default @@ -210,24 +220,37 @@ func ExtractAndDefaultParameters(parameters map[string]string, driverName string p.EnableConfidentialCompute = paramEnableConfidentialCompute case ParameterKeyStoragePools: - if !enableStoragePools { + if !pp.EnableStoragePools { return p, fmt.Errorf("parameters contains invalid option %q", ParameterKeyStoragePools) } storagePools, err := ParseStoragePools(v) if err != nil { - return p, fmt.Errorf("parameters contain invalid value for %s parameter: %w", ParameterKeyStoragePools, err) + return p, fmt.Errorf("parameters contains invalid value for %s parameter %q: %w", ParameterKeyStoragePools, v, err) } p.StoragePools = storagePools case ParameterKeyResourceTags: if err := extractResourceTagsParameter(v, p.ResourceTags); err != nil { return p, err } + case ParameterKeyEnableMultiZoneProvisioning: + if !pp.EnableMultiZone { + return p, fmt.Errorf("parameters contains invalid option %q", ParameterKeyEnableMultiZoneProvisioning) + } + paramEnableMultiZoneProvisioning, err := ConvertStringToBool(v) + if err != nil { + return p, fmt.Errorf("parameters contain invalid value for %s parameter: %w", ParameterKeyEnableMultiZoneProvisioning, err) + } + + p.MultiZoneProvisioning = paramEnableMultiZoneProvisioning + if paramEnableMultiZoneProvisioning { + p.Labels[MultiZoneLabel] = "true" + } default: return p, fmt.Errorf("parameters contains invalid option %q", k) } } if len(p.Tags) > 0 { - p.Tags[tagKeyCreatedBy] = driverName + p.Tags[tagKeyCreatedBy] = pp.DriverName } return p, nil } diff --git a/pkg/common/parameters_test.go b/pkg/common/parameters_test.go index b43aeaeda..75f1c2a16 100644 --- a/pkg/common/parameters_test.go +++ b/pkg/common/parameters_test.go @@ -29,6 +29,7 @@ func TestExtractAndDefaultParameters(t *testing.T) { parameters map[string]string labels map[string]string enableStoragePools bool + enableMultiZone bool extraTags map[string]string expectParams DiskParameters expectErr bool @@ -350,11 +351,53 @@ func TestExtractAndDefaultParameters(t *testing.T) { labels: map[string]string{}, expectErr: true, }, + { + name: "multi-zone-enable parameters, multi-zone label is set, multi-zone feature enabled", + parameters: map[string]string{ParameterKeyType: "hyperdisk-ml", ParameterKeyEnableMultiZoneProvisioning: "true"}, + labels: map[string]string{MultiZoneLabel: "true"}, + enableMultiZone: true, + expectParams: DiskParameters{ + DiskType: "hyperdisk-ml", + ReplicationType: "none", + Tags: map[string]string{}, + Labels: map[string]string{MultiZoneLabel: "true"}, + ResourceTags: map[string]string{}, + MultiZoneProvisioning: true, + }, + }, + { + name: "multi-zone-enable parameters, multi-zone label is false, multi-zone feature enabled", + parameters: map[string]string{ParameterKeyType: "hyperdisk-ml", ParameterKeyEnableMultiZoneProvisioning: "false"}, + enableMultiZone: true, + expectParams: DiskParameters{ + DiskType: "hyperdisk-ml", + ReplicationType: "none", + Tags: map[string]string{}, + ResourceTags: map[string]string{}, + Labels: map[string]string{}, + }, + }, + { + name: "multi-zone-enable parameters, invalid value, multi-zone feature enabled", + parameters: map[string]string{ParameterKeyType: "hyperdisk-ml", ParameterKeyEnableMultiZoneProvisioning: "unknown"}, + enableMultiZone: true, + expectErr: true, + }, + { + name: "multi-zone-enable parameters, multi-zone label is set, multi-zone feature disabled", + parameters: map[string]string{ParameterKeyType: "hyperdisk-ml", ParameterKeyEnableMultiZoneProvisioning: "true"}, + expectErr: true, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - p, err := ExtractAndDefaultParameters(tc.parameters, "testDriver", tc.labels, tc.enableStoragePools, tc.extraTags) + pp := ParameterProcessor{ + DriverName: "testDriver", + EnableStoragePools: tc.enableStoragePools, + EnableMultiZone: tc.enableMultiZone, + } + p, err := pp.ExtractAndDefaultParameters(tc.parameters, tc.labels, tc.extraTags) if gotErr := err != nil; gotErr != tc.expectErr { t.Fatalf("ExtractAndDefaultParameters(%+v) = %v; expectedErr: %v", tc.parameters, err, tc.expectErr) } @@ -362,7 +405,7 @@ func TestExtractAndDefaultParameters(t *testing.T) { return } - if diff := cmp.Diff(p, tc.expectParams); diff != "" { + if diff := cmp.Diff(tc.expectParams, p); diff != "" { t.Errorf("ExtractAndDefaultParameters(%+v): -want, +got \n%s", tc.parameters, diff) } }) diff --git a/pkg/common/utils.go b/pkg/common/utils.go index 25f8a6552..c3b71c1c4 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "regexp" + "slices" "strings" "time" @@ -73,6 +74,10 @@ const ( // Full or partial URL of the machine type resource, in the format: // zones/zone/machineTypes/machine-type machineTypePattern = "zones/[^/]+/machineTypes/([^/]+)$" + + // Full or partial URL of the zone resource, in the format: + // projects/{project}/zones/{zone} + zoneURIPattern = "projects/[^/]+/zones/([^/]+)$" ) var ( @@ -85,6 +90,8 @@ var ( storagePoolFieldsRegex = regexp.MustCompile(`^projects/([^/]+)/zones/([^/]+)/storagePools/([^/]+)$`) + zoneURIRegex = regexp.MustCompile(zoneURIPattern) + // userErrorCodeMap tells how API error types are translated to error codes. userErrorCodeMap = map[int]codes.Code{ http.StatusForbidden: codes.PermissionDenied, @@ -97,6 +104,8 @@ var ( regexParent = regexp.MustCompile(`(^[1-9][0-9]{0,31}$)|(^[a-z][a-z0-9-]{4,28}[a-z0-9]$)`) regexKey = regexp.MustCompile(`^[a-zA-Z0-9]([0-9A-Za-z_.-]{0,61}[a-zA-Z0-9])?$`) regexValue = regexp.MustCompile(`^[a-zA-Z0-9]([0-9A-Za-z_.@%=+:,*#&()\[\]{}\-\s]{0,61}[a-zA-Z0-9])?$`) + + csiRetryableErrorCodes = []codes.Code{codes.Canceled, codes.DeadlineExceeded, codes.Unavailable, codes.Aborted, codes.ResourceExhausted} ) func BytesToGbRoundDown(bytes int64) int64 { @@ -545,9 +554,37 @@ func isGoogleAPIError(err error) (codes.Code, error) { return codes.Unknown, fmt.Errorf("googleapi.Error %w does not map to any known errors", err) } -func LoggedError(msg string, err error) error { +func loggedErrorForCode(msg string, code codes.Code, err error) error { klog.Errorf(msg+"%v", err.Error()) - return status.Errorf(CodeForError(err), msg+"%v", err.Error()) + return status.Errorf(code, msg+"%v", err.Error()) +} + +func LoggedError(msg string, err error) error { + return loggedErrorForCode(msg, CodeForError(err), err) +} + +// NewCombinedError tries to return an appropriate wrapped error that captures +// useful information as an error code +// If there are multiple errors, it extracts the first "retryable" error +// as interpreted by the CSI sidecar. +func NewCombinedError(msg string, errs []error) error { + // If there is only one error, return it as the single error code + if len(errs) == 1 { + LoggedError(msg, errs[0]) + } + + for _, err := range errs { + code := CodeForError(err) + if slices.Contains(csiRetryableErrorCodes, code) { + // Return this as a TemporaryError to lock-in the retryable code + // This will invoke the "existing" error code check in CodeForError + return NewTemporaryError(code, fmt.Errorf("%s: %w", msg, err)) + } + } + + // None of these error codes were retryable. Just return a combined error + // The first matching error (based on our CodeForError) logic will be returned. + return LoggedError(msg, errors.Join(errs...)) } func isValidDiskEncryptionKmsKey(DiskEncryptionKmsKey string) bool { @@ -556,6 +593,14 @@ func isValidDiskEncryptionKmsKey(DiskEncryptionKmsKey string) bool { return kmsKeyPattern.MatchString(DiskEncryptionKmsKey) } +func ParseZoneFromURI(zoneURI string) (string, error) { + zoneMatch := zoneURIRegex.FindStringSubmatch(zoneURI) + if zoneMatch == nil { + return "", fmt.Errorf("failed to parse zone URI. Expected projects/{project}/zones/{zone}. Got: %s", zoneURI) + } + return zoneMatch[1], nil +} + // ParseStoragePools returns an error if none of the given storagePools // (delimited by a comma) are in the format // projects/project/zones/zone/storagePools/storagePool. diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go index e0c14f295..1b9b44e30 100644 --- a/pkg/common/utils_test.go +++ b/pkg/common/utils_test.go @@ -1648,3 +1648,89 @@ func TestUnorderedSlicesEqual(t *testing.T) { }) } } + +func TestParseZoneFromURI(t *testing.T) { + testcases := []struct { + name string + zoneURI string + wantZone string + expectErr bool + }{ + { + name: "ParseZoneFromURI_FullURI", + zoneURI: "https://www.googleapis.com/compute/v1/projects/psch-gke-dev/zones/us-east4-a", + wantZone: "us-east4-a", + }, + { + name: "ParseZoneFromURI_ProjectZoneString", + zoneURI: "projects/psch-gke-dev/zones/us-east4-a", + wantZone: "us-east4-a", + }, + { + name: "ParseZoneFromURI_Malformed", + zoneURI: "projects/psch-gke-dev/regions/us-east4", + expectErr: true, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + gotZone, err := ParseZoneFromURI(tc.zoneURI) + if err != nil && !tc.expectErr { + t.Fatalf("Unexpected error: %v", err) + } + if err == nil && tc.expectErr { + t.Fatalf("Expected err, but none was returned. Zone result: %v", gotZone) + } + if gotZone != tc.wantZone { + t.Errorf("ParseZoneFromURI(%v): got %v, want %v", tc.zoneURI, gotZone, tc.wantZone) + } + }) + } +} + +func TestNewCombinedError(t *testing.T) { + testcases := []struct { + name string + errors []error + wantCode codes.Code + }{ + { + name: "single generic error", + errors: []error{fmt.Errorf("my internal error")}, + wantCode: codes.Internal, + }, + { + name: "single retryable error", + errors: []error{&googleapi.Error{Code: http.StatusTooManyRequests, Message: "Resource Exhausted"}}, + wantCode: codes.ResourceExhausted, + }, + { + name: "multi generic error", + errors: []error{fmt.Errorf("my internal error"), fmt.Errorf("my other internal error")}, + wantCode: codes.Internal, + }, + { + name: "multi retryable error", + errors: []error{fmt.Errorf("my internal error"), &googleapi.Error{Code: http.StatusTooManyRequests, Message: "Resource Exhausted"}}, + wantCode: codes.ResourceExhausted, + }, + { + name: "multi retryable error", + errors: []error{fmt.Errorf("my internal error"), &googleapi.Error{Code: http.StatusGatewayTimeout, Message: "connection reset by peer"}, fmt.Errorf("my other internal error")}, + wantCode: codes.Unavailable, + }, + { + name: "multi retryable error", + errors: []error{fmt.Errorf("The disk resource is already being used"), &googleapi.Error{Code: http.StatusGatewayTimeout, Message: "connection reset by peer"}}, + wantCode: codes.Unavailable, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + gotCode := CodeForError(NewCombinedError("message", tc.errors)) + if gotCode != tc.wantCode { + t.Errorf("NewCombinedError(%v): got %v, want %v", tc.errors, gotCode, tc.wantCode) + } + }) + } +} diff --git a/pkg/gce-cloud-provider/compute/cloud-disk.go b/pkg/gce-cloud-provider/compute/cloud-disk.go index bbd3afc1f..02b70d5df 100644 --- a/pkg/gce-cloud-provider/compute/cloud-disk.go +++ b/pkg/gce-cloud-provider/compute/cloud-disk.go @@ -256,3 +256,14 @@ func (d *CloudDisk) GetLabels() map[string]string { return nil } } + +func (d *CloudDisk) GetAccessMode() string { + switch { + case d.disk != nil: + return d.disk.AccessMode + case d.betaDisk != nil: + return d.betaDisk.AccessMode + default: + return "" + } +} diff --git a/pkg/gce-cloud-provider/compute/cloud-disk_test.go b/pkg/gce-cloud-provider/compute/cloud-disk_test.go index d391212ac..e2bea29d9 100644 --- a/pkg/gce-cloud-provider/compute/cloud-disk_test.go +++ b/pkg/gce-cloud-provider/compute/cloud-disk_test.go @@ -148,3 +148,50 @@ func TestGetLabels(t *testing.T) { } } } + +func TestGetAccessMode(t *testing.T) { + testCases := []struct { + name string + cloudDisk *CloudDisk + wantAccessMode string + }{ + { + name: "v1 disk accessMode", + cloudDisk: &CloudDisk{ + disk: &computev1.Disk{ + AccessMode: "READ_WRITE_SINGLE", + }, + }, + wantAccessMode: "READ_WRITE_SINGLE", + }, + { + name: "beta disk accessMode", + cloudDisk: &CloudDisk{ + betaDisk: &computebeta.Disk{ + AccessMode: "READ_ONLY_MANY", + }, + }, + wantAccessMode: "READ_ONLY_MANY", + }, + { + name: "unset disk accessMode", + cloudDisk: &CloudDisk{ + betaDisk: &computebeta.Disk{}, + }, + wantAccessMode: "", + }, + { + name: "unset disk", + cloudDisk: &CloudDisk{}, + wantAccessMode: "", + }, + } + + for _, tc := range testCases { + t.Logf("Running test: %v", tc.name) + gotAccessMode := tc.cloudDisk.GetAccessMode() + if gotAccessMode != tc.wantAccessMode { + t.Errorf("GetAccessMode() got %v, want %v", gotAccessMode, tc.wantAccessMode) + } + } +} diff --git a/pkg/gce-cloud-provider/compute/fake-gce.go b/pkg/gce-cloud-provider/compute/fake-gce.go index 3b7416b18..77dbf0751 100644 --- a/pkg/gce-cloud-provider/compute/fake-gce.go +++ b/pkg/gce-cloud-provider/compute/fake-gce.go @@ -79,7 +79,11 @@ func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa mockDiskStatus: "READY", } for _, d := range cloudDisks { - fcp.disks[d.GetName()] = d + diskZone := d.GetZone() + if diskZone == "" { + diskZone = zone + } + fcp.disks[meta.ZonalKey(d.GetName(), diskZone).String()] = d } return fcp, nil } @@ -101,8 +105,8 @@ func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Contex if volumeKey.Zone != common.UnspecifiedValue { return project, volumeKey, nil } - for name, d := range cloud.disks { - if name == volumeKey.Name { + for diskVolKey, d := range cloud.disks { + if diskVolKey == volumeKey.String() { volumeKey.Zone = d.GetZone() return project, volumeKey, nil } @@ -127,6 +131,15 @@ func (cloud *FakeCloudProvider) ListZones(ctx context.Context, region string) ([ return []string{cloud.zone, "country-region-fakesecondzone"}, nil } +func (cloud *FakeCloudProvider) ListCompatibleDiskTypeZones(ctx context.Context, project string, zones []string, diskType string) ([]string, error) { + // Assume all zones are compatible + return zones, nil +} + +func (cloud *FakeCloudProvider) ListDisksWithFilter(ctx context.Context, fields []googleapi.Field, filter string) ([]*computev1.Disk, string, error) { + return cloud.ListDisks(ctx, fields) +} + func (cloud *FakeCloudProvider) ListDisks(ctx context.Context, fields []googleapi.Field) ([]*computev1.Disk, string, error) { d := []*computev1.Disk{} for _, cd := range cloud.disks { @@ -167,7 +180,7 @@ func (cloud *FakeCloudProvider) ListSnapshots(ctx context.Context, filter string // Disk Methods func (cloud *FakeCloudProvider) GetDisk(ctx context.Context, project string, volKey *meta.Key, api GCEAPIVersion) (*CloudDisk, error) { - disk, ok := cloud.disks[volKey.Name] + disk, ok := cloud.disks[volKey.String()] if !ok { return nil, notFoundError() } @@ -203,8 +216,8 @@ func (cloud *FakeCloudProvider) ValidateExistingDisk(ctx context.Context, resp * return ValidateDiskParameters(resp, params) } -func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) error { - if disk, ok := cloud.disks[volKey.Name]; ok { +func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error { + if disk, ok := cloud.disks[volKey.String()]; ok { err := cloud.ValidateExistingDisk(ctx, disk, params, int64(capacityRange.GetRequiredBytes()), int64(capacityRange.GetLimitBytes()), @@ -259,15 +272,15 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, if containsBetaDiskType(hyperdiskTypes, params.DiskType) { betaDisk := convertV1DiskToBetaDisk(computeDisk) betaDisk.EnableConfidentialCompute = params.EnableConfidentialCompute - cloud.disks[volKey.Name] = CloudDiskFromBeta(betaDisk) + cloud.disks[volKey.String()] = CloudDiskFromBeta(betaDisk) } else { - cloud.disks[volKey.Name] = CloudDiskFromV1(computeDisk) + cloud.disks[volKey.String()] = CloudDiskFromV1(computeDisk) } return nil } func (cloud *FakeCloudProvider) DeleteDisk(ctx context.Context, project string, volKey *meta.Key) error { - delete(cloud.disks, volKey.Name) + delete(cloud.disks, volKey.String()) return nil } @@ -307,6 +320,22 @@ func (cloud *FakeCloudProvider) DetachDisk(ctx context.Context, project, deviceN return nil } +func (cloud *FakeCloudProvider) SetDiskAccessMode(ctx context.Context, project string, volKey *meta.Key, accessMode string) error { + disk, ok := cloud.disks[volKey.String()] + if !ok { + return fmt.Errorf("disk %v not found", volKey) + } + + if disk.disk != nil { + disk.disk.AccessMode = accessMode + } + if disk.betaDisk != nil { + disk.betaDisk.AccessMode = accessMode + } + + return nil +} + func (cloud *FakeCloudProvider) GetDiskTypeURI(project string, volKey *meta.Key, diskType string) string { switch volKey.Type() { case meta.Zonal: @@ -390,7 +419,7 @@ func (cloud *FakeCloudProvider) CreateSnapshot(ctx context.Context, project stri } func (cloud *FakeCloudProvider) ResizeDisk(ctx context.Context, project string, volKey *meta.Key, requestBytes int64) (int64, error) { - disk, ok := cloud.disks[volKey.Name] + disk, ok := cloud.disks[volKey.String()] if !ok { return -1, notFoundError() } diff --git a/pkg/gce-cloud-provider/compute/gce-compute.go b/pkg/gce-cloud-provider/compute/gce-compute.go index eaa0c9953..13f00fcc2 100644 --- a/pkg/gce-cloud-provider/compute/gce-compute.go +++ b/pkg/gce-cloud-provider/compute/gce-compute.go @@ -39,6 +39,7 @@ import ( "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + "k8s.io/utils/strings/slices" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" ) @@ -64,6 +65,8 @@ const ( GCEAPIVersionBeta GCEAPIVersion = "beta" ) +var GCEAPIVersions = []GCEAPIVersion{GCEAPIVersionBeta, GCEAPIVersionV1} + // AttachDiskBackoff is backoff used to wait for AttachDisk to complete. // Default values are similar to Poll every 5 seconds with 2 minute timeout. var AttachDiskBackoff = wait.Backoff{ @@ -99,15 +102,18 @@ type GCECompute interface { GetDisk(ctx context.Context, project string, volumeKey *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error) RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error) ValidateExistingDisk(ctx context.Context, disk *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error - InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) error + InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error DeleteDisk(ctx context.Context, project string, volumeKey *meta.Key) error AttachDisk(ctx context.Context, project string, volKey *meta.Key, readWrite, diskType, instanceZone, instanceName string, forceAttach bool) error DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error + SetDiskAccessMode(ctx context.Context, project string, volKey *meta.Key, accessMode string) error + ListCompatibleDiskTypeZones(ctx context.Context, project string, zones []string, diskType string) ([]string, error) GetDiskSourceURI(project string, volKey *meta.Key) string GetDiskTypeURI(project string, volKey *meta.Key, diskType string) string WaitForAttach(ctx context.Context, project string, volKey *meta.Key, diskType, instanceZone, instanceName string) error ResizeDisk(ctx context.Context, project string, volKey *meta.Key, requestBytes int64) (int64, error) ListDisks(ctx context.Context, fields []googleapi.Field) ([]*computev1.Disk, string, error) + ListDisksWithFilter(ctx context.Context, fields []googleapi.Field, filter string) ([]*computev1.Disk, string, error) ListInstances(ctx context.Context, fields []googleapi.Field) ([]*computev1.Instance, string, error) // Regional Disk Methods GetReplicaZoneURI(project string, zone string) string @@ -138,6 +144,15 @@ func (cloud *CloudProvider) GetDefaultZone() string { // ListDisks lists disks based on maxEntries and pageToken only in the project // and region that the driver is running in. func (cloud *CloudProvider) ListDisks(ctx context.Context, fields []googleapi.Field) ([]*computev1.Disk, string, error) { + filter := "" + return cloud.listDisksInternal(ctx, fields, filter) +} + +func (cloud *CloudProvider) ListDisksWithFilter(ctx context.Context, fields []googleapi.Field, filter string) ([]*computev1.Disk, string, error) { + return cloud.listDisksInternal(ctx, fields, filter) +} + +func (cloud *CloudProvider) listDisksInternal(ctx context.Context, fields []googleapi.Field, filter string) ([]*computev1.Disk, string, error) { region, err := common.GetRegionFromZones([]string{cloud.zone}) if err != nil { return nil, "", fmt.Errorf("failed to get region from zones: %w", err) @@ -151,6 +166,7 @@ func (cloud *CloudProvider) ListDisks(ctx context.Context, fields []googleapi.Fi // listing out regional disks in the region rlCall := cloud.service.RegionDisks.List(cloud.project, region) rlCall.Fields(fields...) + rlCall.Filter(filter) nextPageToken := "pageToken" for nextPageToken != "" { rDiskList, err := rlCall.Do() @@ -166,6 +182,7 @@ func (cloud *CloudProvider) ListDisks(ctx context.Context, fields []googleapi.Fi for _, zone := range zones { lCall := cloud.service.Disks.List(cloud.project, zone) lCall.Fields(fields...) + lCall.Filter(filter) nextPageToken := "pageToken" for nextPageToken != "" { diskList, err := lCall.Do() @@ -418,7 +435,7 @@ func ValidateDiskParameters(disk *CloudDisk, params common.DiskParameters) error return nil } -func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) error { +func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error { klog.V(5).Infof("Inserting disk %v", volKey) description, err := encodeTags(params.Tags) @@ -431,7 +448,7 @@ func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volK if description == "" { description = "Disk created by GCE-PD CSI Driver" } - return cloud.insertZonalDisk(ctx, project, volKey, params, capBytes, capacityRange, snapshotID, volumeContentSourceVolumeID, description, multiWriter) + return cloud.insertZonalDisk(ctx, project, volKey, params, capBytes, capacityRange, snapshotID, volumeContentSourceVolumeID, description, multiWriter, accessMode) case meta.Regional: if description == "" { description = "Regional disk created by GCE-PD CSI Driver" @@ -492,6 +509,7 @@ func convertV1DiskToBetaDisk(v1Disk *computev1.Disk) *computebeta.Disk { Status: v1Disk.Status, SelfLink: v1Disk.SelfLink, Params: params, + AccessMode: v1Disk.AccessMode, } // Hyperdisk doesn't currently support multiWriter (https://cloud.google.com/compute/docs/disks/hyperdisks#limitations), @@ -646,7 +664,8 @@ func (cloud *CloudProvider) insertZonalDisk( snapshotID string, volumeContentSourceVolumeID string, description string, - multiWriter bool) error { + multiWriter bool, + accessMode string) error { var ( err error opName string @@ -714,6 +733,7 @@ func (cloud *CloudProvider) insertZonalDisk( ResourceManagerTags: resourceTags, } } + diskToCreate.AccessMode = accessMode if gceAPIVersion == GCEAPIVersionBeta { var insertOp *computebeta.Operation @@ -875,6 +895,73 @@ func (cloud *CloudProvider) DetachDisk(ctx context.Context, project, deviceName, return nil } +func (cloud *CloudProvider) SetDiskAccessMode(ctx context.Context, project string, volKey *meta.Key, accessMode string) error { + diskMask := &computev1.Disk{ + AccessMode: accessMode, + Name: volKey.Name, + } + switch volKey.Type() { + case meta.Zonal: + op, err := cloud.service.Disks.Update(project, volKey.Zone, volKey.Name, diskMask).Context(ctx).Paths("accessMode").Do() + if err != nil { + return fmt.Errorf("failed to set access mode for zonal volume %v: %w", volKey, err) + } + klog.V(5).Infof("SetDiskAccessMode operation %s for disk %s", op.Name, volKey.Name) + + err = cloud.waitForZonalOp(ctx, project, op.Name, volKey.Zone) + if err != nil { + return fmt.Errorf("failed waiting for op for zonal disk update for %v: %w", volKey, err) + } + case meta.Regional: + op, err := cloud.service.RegionDisks.Update(project, volKey.Region, volKey.Name, diskMask).Context(ctx).Paths("accessMode").Do() + if err != nil { + return fmt.Errorf("failed to set access mode for regional volume %v: %w", volKey, err) + } + klog.V(5).Infof("SetDiskAccessMode operation %s for disk %s", op.Name, volKey.Name) + + err = cloud.waitForRegionalOp(ctx, project, op.Name, volKey.Region) + if err != nil { + return fmt.Errorf("failed waiting for op for regional disk update for %v: %w", volKey, err) + } + default: + return fmt.Errorf("volume key %v not zonal nor regional", volKey.Name) + } + + return nil +} + +func (cloud *CloudProvider) ListCompatibleDiskTypeZones(ctx context.Context, project string, zones []string, diskType string) ([]string, error) { + diskTypeFilter := fmt.Sprintf("name=%s", diskType) + filters := []string{diskTypeFilter} + diskTypeListCall := cloud.service.DiskTypes.AggregatedList(project).Context(ctx).Filter(strings.Join(filters, " ")) + + supportedZones := []string{} + nextPageToken := "pageToken" + for nextPageToken != "" { + diskTypeList, err := diskTypeListCall.Do() + if err != nil { + return nil, err + } + for _, item := range diskTypeList.Items { + for _, diskType := range item.DiskTypes { + zone, err := common.ParseZoneFromURI(diskType.Zone) + if err != nil { + klog.Warningf("Failed to parse zone %q from diskTypes API: %v", diskType.Zone, err) + continue + } + if slices.Contains(zones, zone) { + supportedZones = append(supportedZones, zone) + } + } + } + + nextPageToken = diskTypeList.NextPageToken + diskTypeListCall.PageToken(nextPageToken) + } + + return supportedZones, nil +} + func (cloud *CloudProvider) GetDiskSourceURI(project string, volKey *meta.Key) string { switch volKey.Type() { case meta.Zonal: diff --git a/pkg/gce-cloud-provider/compute/gce.go b/pkg/gce-cloud-provider/compute/gce.go index c2d2df8cd..f25519cf4 100644 --- a/pkg/gce-cloud-provider/compute/gce.go +++ b/pkg/gce-cloud-provider/compute/gce.go @@ -42,7 +42,6 @@ import ( ) type Environment string -type Version string const ( TokenURL = "https://accounts.google.com/o/oauth2/token" @@ -54,8 +53,6 @@ const ( regionURITemplate = "projects/%s/regions/%s" replicaZoneURITemplateSingleZone = "projects/%s/zones/%s" // {gce.projectID}/zones/{disk.Zone} - versionV1 Version = "v1" - versionBeta Version = "beta" EnvironmentStaging Environment = "staging" EnvironmentProduction Environment = "production" @@ -229,7 +226,7 @@ func readConfig(configPath string) (*ConfigFile, error) { } func createBetaCloudService(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint *url.URL, computeEnvironment Environment) (*computebeta.Service, error) { - computeOpts, err := getComputeVersion(ctx, tokenSource, computeEndpoint, computeEnvironment, versionBeta) + computeOpts, err := getComputeVersion(ctx, tokenSource, computeEndpoint, computeEnvironment, GCEAPIVersionBeta) if err != nil { klog.Errorf("Failed to get compute endpoint: %s", err) } @@ -242,7 +239,7 @@ func createBetaCloudService(ctx context.Context, vendorVersion string, tokenSour } func createCloudService(ctx context.Context, vendorVersion string, tokenSource oauth2.TokenSource, computeEndpoint *url.URL, computeEnvironment Environment) (*compute.Service, error) { - computeOpts, err := getComputeVersion(ctx, tokenSource, computeEndpoint, computeEnvironment, versionV1) + computeOpts, err := getComputeVersion(ctx, tokenSource, computeEndpoint, computeEnvironment, GCEAPIVersionV1) if err != nil { klog.Errorf("Failed to get compute endpoint: %s", err) } @@ -254,7 +251,7 @@ func createCloudService(ctx context.Context, vendorVersion string, tokenSource o return service, nil } -func getComputeVersion(ctx context.Context, tokenSource oauth2.TokenSource, computeEndpoint *url.URL, computeEnvironment Environment, computeVersion Version) ([]option.ClientOption, error) { +func getComputeVersion(ctx context.Context, tokenSource oauth2.TokenSource, computeEndpoint *url.URL, computeEnvironment Environment, computeVersion GCEAPIVersion) ([]option.ClientOption, error) { client, err := newOauthClient(ctx, tokenSource) if err != nil { return nil, err @@ -270,7 +267,7 @@ func getComputeVersion(ctx context.Context, tokenSource oauth2.TokenSource, comp return computeOpts, nil } -func constructComputeEndpointPath(env Environment, version Version) string { +func constructComputeEndpointPath(env Environment, version GCEAPIVersion) string { prefix := "" if env == EnvironmentStaging { prefix = fmt.Sprintf("%s_", env) diff --git a/pkg/gce-cloud-provider/compute/gce_test.go b/pkg/gce-cloud-provider/compute/gce_test.go index 49f85221d..94bb4fd9f 100644 --- a/pkg/gce-cloud-provider/compute/gce_test.go +++ b/pkg/gce-cloud-provider/compute/gce_test.go @@ -106,7 +106,7 @@ func TestGetComputeVersion(t *testing.T) { name string computeEndpoint *url.URL computeEnvironment Environment - computeVersion Version + computeVersion GCEAPIVersion expectedEndpoint string expectError bool }{ @@ -115,7 +115,7 @@ func TestGetComputeVersion(t *testing.T) { name: "check for production environment", computeEndpoint: convertStringToURL("https://compute.googleapis.com"), computeEnvironment: EnvironmentProduction, - computeVersion: versionBeta, + computeVersion: GCEAPIVersionBeta, expectedEndpoint: "https://compute.googleapis.com/compute/beta/", expectError: false, }, @@ -123,7 +123,7 @@ func TestGetComputeVersion(t *testing.T) { name: "check for staging environment", computeEndpoint: convertStringToURL("https://compute.googleapis.com"), computeEnvironment: EnvironmentStaging, - computeVersion: versionV1, + computeVersion: GCEAPIVersionV1, expectedEndpoint: "https://compute.googleapis.com/compute/staging_v1/", expectError: false, }, diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 81c0426a3..a2d57000d 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -195,6 +195,8 @@ const ( resourceProject = "projects" listDisksUsersField = googleapi.Field("items/users") + + readOnlyManyAccessMode = "READ_ONLY_MANY" ) var ( @@ -214,7 +216,8 @@ var ( "items/selfLink", "nextPageToken", } - listDisksFieldsWithUsers = append(listDisksFieldsWithoutUsers, "items/users") + listDisksFieldsWithUsers = append(listDisksFieldsWithoutUsers, "items/users") + disksWithModifiableAccessMode = []string{"hyperdisk-ml"} ) func isDiskReady(disk *gce.CloudDisk) (bool, error) { @@ -279,6 +282,15 @@ func useVolumeCloning(req *csi.CreateVolumeRequest) bool { } func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { + response, err := gceCS.createVolumeInternal(ctx, req) + if err != nil && req != nil { + klog.V(4).Infof("CreateVolume succeeded for volume %v", req.Name) + } + + return response, err +} + +func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { var err error diskTypeForMetric := metrics.DefaultDiskTypeForMetric enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute @@ -288,17 +300,16 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre }() // Validate arguments volumeCapabilities := req.GetVolumeCapabilities() - name := req.GetName() capacityRange := req.GetCapacityRange() - if len(name) == 0 { + if len(req.GetName()) == 0 { return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided") } if volumeCapabilities == nil || len(volumeCapabilities) == 0 { return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided") } - capBytes, err := getRequestCapacity(capacityRange) - if err != nil { + // Validate request capacity early + if _, err := getRequestCapacity(capacityRange); err != nil { return nil, status.Errorf(codes.InvalidArgument, "CreateVolume Request Capacity is invalid: %v", err.Error()) } @@ -309,7 +320,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // Apply Parameters (case-insensitive). We leave validation of // the values to the cloud provider. - params, err := common.ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.name, gceCS.Driver.extraVolumeLabels, gceCS.enableStoragePools, gceCS.Driver.extraTags) + params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags) diskTypeForMetric = params.DiskType enableConfidentialCompute = strconv.FormatBool(params.EnableConfidentialCompute) hasStoragePools := len(params.StoragePools) > 0 @@ -317,11 +328,9 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error()) } - // Determine multiWriter - gceAPIVersion := gce.GCEAPIVersionV1 - multiWriter, _ := getMultiWriterFromCapabilities(volumeCapabilities) - if multiWriter { - gceAPIVersion = gce.GCEAPIVersionBeta + // Validate multiwriter + if _, err := getMultiWriterFromCapabilities(volumeCapabilities); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "VolumeCapabilities is invalid: %v", err.Error()) } err = validateStoragePools(req, params, gceCS.CloudProvider.GetDefaultProject()) @@ -331,11 +340,176 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre return nil, err } + // Validate VolumeContentSource is set when access mode is read only + readonly, _ := getReadOnlyFromCapabilities(volumeCapabilities) + if readonly && req.GetVolumeContentSource() == nil { + return nil, status.Error(codes.InvalidArgument, "VolumeContentSource must be provided when AccessMode is set to read only") + } + + // Validate multi-zone provisioning configuration + err = gceCS.validateMultiZoneProvisioning(req, params) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "CreateVolume failed to validate multi-zone provisioning request: %v", err) + } + // Verify that the regional availability class is only used on regional disks. if params.ForceAttach && params.ReplicationType != replicationTypeRegionalPD { return nil, status.Errorf(codes.InvalidArgument, "invalid availabilty class for zonal disk") } + if gceCS.multiZoneVolumeHandleConfig.Enable && params.MultiZoneProvisioning { + // Create multi-zone disk, that may have up to N disks. + return gceCS.createMultiZoneDisk(ctx, req, params) + } + + // Create single device zonal or regional disk + return gceCS.createSingleDeviceDisk(ctx, req, params) +} + +func (gceCS *GCEControllerServer) getSupportedZonesForPDType(ctx context.Context, zones []string, diskType string) ([]string, error) { + project := gceCS.CloudProvider.GetDefaultProject() + zones, err := gceCS.CloudProvider.ListCompatibleDiskTypeZones(ctx, project, zones, diskType) + if err != nil { + return nil, err + } + return zones, nil +} + +func (gceCS *GCEControllerServer) getMultiZoneProvisioningZones(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters) ([]string, error) { + top := req.GetAccessibilityRequirements() + if top == nil { + return nil, status.Errorf(codes.InvalidArgument, "no topology specified") + } + prefZones, err := getZonesFromTopology(top.GetPreferred()) + if err != nil { + return nil, fmt.Errorf("could not get zones from preferred topology: %w", err) + } + reqZones, err := getZonesFromTopology(top.GetRequisite()) + if err != nil { + return nil, fmt.Errorf("could not get zones from requisite topology: %w", err) + } + prefSet := sets.NewString(prefZones...) + reqSet := sets.NewString(reqZones...) + prefAndReqSet := prefSet.Union(reqSet) + availableZones := prefAndReqSet.List() + if prefAndReqSet.Len() == 0 { + // If there are no specified zones, this means that there were no aggregate + // zones (eg: no nodes running) in the cluster + availableZones = gceCS.fallbackRequisiteZones + } + + supportedZones, err := gceCS.getSupportedZonesForPDType(ctx, availableZones, params.DiskType) + if err != nil { + return nil, fmt.Errorf("could not get supported zones for disk type %v from zone list %v: %w", params.DiskType, prefAndReqSet.List(), err) + } + + // It's possible that the provided requisite zones shifted since the last time that + // CreateVolume was called (eg: due to a node being removed in a zone) + // Ensure that we combine the supportedZones with any existing zones to get the full set. + existingZones, err := gceCS.getZonesWithDiskNameAndType(ctx, req.Name, params.DiskType) + if err != nil { + return nil, common.LoggedError(fmt.Sprintf("failed to check existing list of zones for request: %v", req.Name), err) + } + + supportedSet := sets.NewString(supportedZones...) + existingSet := sets.NewString(existingZones...) + combinedZones := existingSet.Union(supportedSet) + + return combinedZones.List(), nil +} + +func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters) (*csi.CreateVolumeResponse, error) { + // Determine the zones that are needed. + var err error + + // For multi-zone, we either select: + // 1) The zones specified in requisite topology requirements + // 2) All zones in the region that are compatible with the selected disk type + zones, err := gceCS.getMultiZoneProvisioningZones(ctx, req, params) + if err != nil { + return nil, err + } + + multiZoneVolKey := meta.ZonalKey(req.GetName(), common.MultiZoneValue) + volumeID, err := common.KeyToVolumeID(multiZoneVolKey, gceCS.CloudProvider.GetDefaultProject()) + if err != nil { + return nil, err + } + if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired { + return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer gceCS.volumeLocks.Release(volumeID) + + createDiskErrs := []error{} + createdDisks := make([]*gce.CloudDisk, 0, len(zones)) + for _, zone := range zones { + volKey := meta.ZonalKey(req.GetName(), zone) + klog.V(4).Infof("Creating single zone disk for zone %q and volume: %v", zone, volKey) + disk, err := gceCS.createSingleDisk(ctx, req, params, volKey, []string{zone}) + if err != nil { + createDiskErrs = append(createDiskErrs, err) + continue + } + + createdDisks = append(createdDisks, disk) + } + if len(createDiskErrs) > 0 { + return nil, common.LoggedError("Failed to create multi-zone disk: ", errors.Join(createDiskErrs...)) + } + + if len(createdDisks) == 0 { + return nil, status.Errorf(codes.Internal, "could not create any disks for request: %v", req) + } + + // Use the first response as a template + volumeId := fmt.Sprintf("projects/%s/zones/%s/disks/%s", gceCS.CloudProvider.GetDefaultProject(), common.MultiZoneValue, req.GetName()) + klog.V(4).Infof("CreateVolume succeeded for multi-zone disks in zones %s: %v", zones, multiZoneVolKey) + return generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, volumeId), nil +} + +func (gceCS *GCEControllerServer) getZonesWithDiskNameAndType(ctx context.Context, name string, diskType string) ([]string, error) { + zoneOnlyFields := []googleapi.Field{"items/zone", "items/type"} + nameAndRegionFilter := fmt.Sprintf("name=%s", name) + disksWithZone, _, err := gceCS.CloudProvider.ListDisksWithFilter(ctx, zoneOnlyFields, nameAndRegionFilter) + if err != nil { + return nil, fmt.Errorf("failed to check existing zones for disk name %v: %w", name, err) + } + zones := []string{} + for _, disk := range disksWithZone { + if !strings.Contains(disk.Type, diskType) || disk.Zone == "" { + continue + } + diskZone, err := common.ParseZoneFromURI(disk.Zone) + if err != nil { + klog.Warningf("Malformed zone URI %v for disk %v from ListDisks call. Skipping", disk.Zone, name) + continue + } + zones = append(zones, diskZone) + } + return zones, nil +} + +func (gceCS *GCEControllerServer) updateAccessModeIfNecessary(ctx context.Context, volKey *meta.Key, disk *gce.CloudDisk, readonly bool) error { + if !slices.Contains(disksWithModifiableAccessMode, disk.GetPDType()) { + // If this isn't a disk that has access mode (eg: Hyperdisk ML), return + // So far, HyperdiskML is the only disk type that allows the disk type to be modified. + return nil + } + if !readonly { + // Only update the access mode if we're converting from ReadWrite to ReadOnly + return nil + } + project := gceCS.CloudProvider.GetDefaultProject() + if disk.GetAccessMode() == readOnlyManyAccessMode { + // If the access mode is already readonly, return + return nil + } + + return gceCS.CloudProvider.SetDiskAccessMode(ctx, project, volKey, readOnlyManyAccessMode) +} + +func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters) (*csi.CreateVolumeResponse, error) { + var err error var locationTopReq *locationRequirements if useVolumeCloning(req) { locationTopReq, err = cloningLocationRequirements(req, params.ReplicationType) @@ -356,7 +530,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre if len(zones) != 1 { return nil, status.Errorf(codes.Internal, "Failed to pick exactly 1 zone for zonal disk, got %v instead", len(zones)) } - volKey = meta.ZonalKey(name, zones[0]) + volKey = meta.ZonalKey(req.GetName(), zones[0]) case replicationTypeRegionalPD: zones, err = gceCS.pickZones(ctx, req.GetAccessibilityRequirements(), 2, locationTopReq) @@ -367,7 +541,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre if err != nil { return nil, status.Errorf(codes.InvalidArgument, "CreateVolume failed to get region from zones: %v", err.Error()) } - volKey = meta.RegionalKey(name, region) + volKey = meta.RegionalKey(req.GetName(), region) default: return nil, status.Errorf(codes.InvalidArgument, "CreateVolume replication type '%s' is not supported", params.ReplicationType) } @@ -380,9 +554,27 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID) } defer gceCS.volumeLocks.Release(volumeID) + disk, err := gceCS.createSingleDisk(ctx, req, params, volKey, zones) + + if err != nil { + return nil, common.LoggedError("CreateVolume failed: %v", err) + } + + return generateCreateVolumeResponseWithVolumeId(disk, zones, params, volumeID), err +} + +func (gceCS *GCEControllerServer) createSingleDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters, volKey *meta.Key, zones []string) (*gce.CloudDisk, error) { + capacityRange := req.GetCapacityRange() + capBytes, _ := getRequestCapacity(capacityRange) + multiWriter, _ := getMultiWriterFromCapabilities(req.GetVolumeCapabilities()) + readonly, _ := getReadOnlyFromCapabilities(req.GetVolumeCapabilities()) + accessMode := "" + if readonly && slices.Contains(disksWithModifiableAccessMode, params.DiskType) { + accessMode = readOnlyManyAccessMode + } // Validate if disk already exists - existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, gceAPIVersion) + existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, gceCS.CloudProvider.GetDefaultProject(), volKey, getGCEApiVersion(multiWriter)) if err != nil { if !gce.IsGCEError(err, "notFound") { // failed to GetDisk, however the Disk may already be created, the error code should be non-Final @@ -409,7 +601,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre // If there is no validation error, immediately return success klog.V(4).Infof("CreateVolume succeeded for disk %v, it already exists and was compatible", volKey) - return generateCreateVolumeResponse(existingDisk, zones, params) + return existingDisk, nil } snapshotID := "" @@ -437,7 +629,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } // Verify that the volume in VolumeContentSource exists. - diskFromSourceVolume, err := gceCS.CloudProvider.GetDisk(ctx, project, sourceVolKey, gceAPIVersion) + diskFromSourceVolume, err := gceCS.CloudProvider.GetDisk(ctx, project, sourceVolKey, getGCEApiVersion(multiWriter)) if err != nil { if gce.IsGCEError(err, "notFound") { return nil, status.Errorf(codes.NotFound, "CreateVolume source volume %s does not exist", volumeContentSourceVolumeID) @@ -485,20 +677,18 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre return nil, status.Errorf(codes.Aborted, "CreateVolume disk from source volume %v is not ready", sourceVolKey) } } - } else { // if VolumeContentSource is nil, validate access mode is not read only - if readonly, _ := getReadOnlyFromCapabilities(volumeCapabilities); readonly { - return nil, status.Error(codes.InvalidArgument, "VolumeContentSource must be provided when AccessMode is set to read only") - } } // Create the disk var disk *gce.CloudDisk + name := req.GetName() + switch params.ReplicationType { case replicationTypeNone: if len(zones) != 1 { return nil, status.Errorf(codes.Internal, "CreateVolume failed to get a single zone for creating zonal disk, instead got: %v", zones) } - disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) + disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode) if err != nil { return nil, common.LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err) } @@ -506,7 +696,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre if len(zones) != 2 { return nil, status.Errorf(codes.Internal, "CreateVolume failed to get a 2 zones for creating regional disk, instead got: %v", zones) } - disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter) + disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode) if err != nil { return nil, common.LoggedError("CreateVolume failed to create regional disk "+name+": ", err) } @@ -523,18 +713,11 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre } klog.V(4).Infof("CreateVolume succeeded for disk %v", volKey) - return generateCreateVolumeResponse(disk, zones, params) - + return disk, nil } func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() // Validate arguments volumeID := req.GetVolumeId() if len(volumeID) == 0 { @@ -549,6 +732,78 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del return &csi.DeleteVolumeResponse{}, nil } + volumeIsMultiZone := isMultiZoneVolKey(volKey) + if gceCS.multiZoneVolumeHandleConfig.Enable && volumeIsMultiZone { + // Delete multi-zone disk, that may have up to N disks. + return gceCS.deleteMultiZoneDisk(ctx, req, project, volKey) + } + + // Delete zonal or regional disk + return gceCS.deleteSingleDeviceDisk(ctx, req, project, volKey) +} + +func getGCEApiVersion(multiWriter bool) gce.GCEAPIVersion { + if multiWriter { + return gce.GCEAPIVersionBeta + } + + return gce.GCEAPIVersionV1 +} + +func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req *csi.DeleteVolumeRequest, project string, volKey *meta.Key) (*csi.DeleteVolumeResponse, error) { + // List disks with same name + var err error + diskTypeForMetric := metrics.DefaultDiskTypeForMetric + enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute + enableStoragePools := metrics.DefaultEnableStoragePools + defer func() { + gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) + }() + existingZones := []string{gceCS.CloudProvider.GetDefaultZone()} + zones, err := getDefaultZonesInRegion(ctx, gceCS, existingZones) + if err != nil { + return nil, fmt.Errorf("failed to list default zones: %w", err) + } + + volumeID := req.GetVolumeId() + if acquired := gceCS.volumeLocks.TryAcquire(volumeID); !acquired { + return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer gceCS.volumeLocks.Release(volumeID) + + deleteDiskErrs := []error{} + for _, zone := range zones { + zonalVolKey := &meta.Key{ + Name: volKey.Name, + Region: volKey.Region, + Zone: zone, + } + disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, zonalVolKey, gce.GCEAPIVersionV1) + // TODO: Consolidate the parameters here, rather than taking the last. + diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk) + err := gceCS.CloudProvider.DeleteDisk(ctx, project, zonalVolKey) + if err != nil { + deleteDiskErrs = append(deleteDiskErrs, gceCS.CloudProvider.DeleteDisk(ctx, project, volKey)) + } + } + + if len(deleteDiskErrs) > 0 { + return nil, common.LoggedError("Failed to delete multi-zone disk: ", errors.Join(deleteDiskErrs...)) + } + + klog.V(4).Infof("DeleteVolume succeeded for disk %v", volKey) + return &csi.DeleteVolumeResponse{}, nil +} + +func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, req *csi.DeleteVolumeRequest, project string, volKey *meta.Key) (*csi.DeleteVolumeResponse, error) { + var err error + diskTypeForMetric := metrics.DefaultDiskTypeForMetric + enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute + enableStoragePools := metrics.DefaultEnableStoragePools + defer func() { + gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) + }() + volumeID := req.GetVolumeId() project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { if gce.IsGCENotFoundError(err) { @@ -661,6 +916,33 @@ func (gceCS *GCEControllerServer) validateMultiZoneDisk(volumeID string, disk *g return nil } +func (gceCS *GCEControllerServer) validateMultiZoneProvisioning(req *csi.CreateVolumeRequest, params common.DiskParameters) error { + if !gceCS.multiZoneVolumeHandleConfig.Enable { + return nil + } + if !params.MultiZoneProvisioning { + return nil + } + + // For volume populator, we want to allow multiple RWO disks to be created + // with the same name, so they can be hydrated across multiple zones. + + // We don't have support volume cloning from an existing PVC + if useVolumeCloning(req) { + return fmt.Errorf("%q parameter does not support volume cloning", common.ParameterKeyEnableMultiZoneProvisioning) + } + + if readonly, _ := getReadOnlyFromCapabilities(req.GetVolumeCapabilities()); !readonly && req.GetVolumeContentSource() != nil { + return fmt.Errorf("%q parameter does not support specifying volume content source in readwrite mode", common.ParameterKeyEnableMultiZoneProvisioning) + } + + if !slices.Contains(gceCS.multiZoneVolumeHandleConfig.DiskTypes, params.DiskType) { + return fmt.Errorf("%q parameter with unsupported disk type: %v", common.ParameterKeyEnableMultiZoneProvisioning, params.DiskType) + } + + return nil +} + func isMultiZoneVolKey(volumeKey *meta.Key) bool { return volumeKey.Type() == meta.Zonal && volumeKey.Zone == common.MultiZoneValue } @@ -750,6 +1032,9 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con klog.V(4).Infof("ControllerPublishVolume succeeded for disk %v to instance %v, already attached.", volKey, nodeID) return pubVolResp, nil, disk } + if err := gceCS.updateAccessModeIfNecessary(ctx, volKey, disk, readOnly); err != nil { + return nil, common.LoggedError("Failed to update access mode: ", err), disk + } instanceZone, instanceName, err = common.NodeIDToZoneAndName(nodeID) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), disk @@ -891,6 +1176,14 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C return &csi.ControllerUnpublishVolumeResponse{}, nil, diskToUnpublish } +func (gceCS *GCEControllerServer) parameterProcessor() *common.ParameterProcessor { + return &common.ParameterProcessor{ + DriverName: gceCS.Driver.name, + EnableStoragePools: gceCS.enableStoragePools, + EnableMultiZone: gceCS.multiZoneVolumeHandleConfig.Enable, + } +} + func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { var err error diskTypeForMetric := metrics.DefaultDiskTypeForMetric @@ -939,7 +1232,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context } // Validate the disk parameters match the disk we GET - params, err := common.ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.name, gceCS.Driver.extraVolumeLabels, gceCS.enableStoragePools, gceCS.Driver.extraTags) + params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error()) } @@ -1908,7 +2201,7 @@ func (gceCS *GCEControllerServer) pickZones(ctx context.Context, top *csi.Topolo existingZones = []string{locationTopReq.srcVolZone} } // If topology is nil, then the Immediate binding mode was used without setting allowedTopologies in the storageclass. - zones, err = getDefaultZonesInRegion(ctx, gceCS, existingZones, numZones) + zones, err = getNumDefaultZonesInRegion(ctx, gceCS, existingZones, numZones) if err != nil { return nil, fmt.Errorf("failed to get default %v zones in region: %w", numZones, err) } @@ -1918,16 +2211,24 @@ func (gceCS *GCEControllerServer) pickZones(ctx context.Context, top *csi.Topolo return zones, nil } -func getDefaultZonesInRegion(ctx context.Context, gceCS *GCEControllerServer, existingZones []string, numZones int) ([]string, error) { +func getDefaultZonesInRegion(ctx context.Context, gceCS *GCEControllerServer, existingZones []string) ([]string, error) { region, err := common.GetRegionFromZones(existingZones) if err != nil { return nil, fmt.Errorf("failed to get region from zones: %w", err) } - needToGet := numZones - len(existingZones) totZones, err := gceCS.CloudProvider.ListZones(ctx, region) if err != nil { return nil, fmt.Errorf("failed to list zones from cloud provider: %w", err) } + return totZones, nil +} + +func getNumDefaultZonesInRegion(ctx context.Context, gceCS *GCEControllerServer, existingZones []string, numZones int) ([]string, error) { + needToGet := numZones - len(existingZones) + totZones, err := getDefaultZonesInRegion(ctx, gceCS, existingZones) + if err != nil { + return nil, err + } remainingZones := sets.NewString(totZones...).Difference(sets.NewString(existingZones...)) l := remainingZones.List() if len(l) < needToGet { @@ -1969,12 +2270,7 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) { return info, nil } -func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params common.DiskParameters) (*csi.CreateVolumeResponse, error) { - volumeId, err := getResourceId(disk.GetSelfLink()) - if err != nil { - return nil, fmt.Errorf("cannot get volume id from %s: %w", disk.GetSelfLink(), err) - } - +func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, volumeId string) *csi.CreateVolumeResponse { tops := []*csi.Topology{} for _, zone := range zones { tops = append(tops, &csi.Topology{ @@ -2024,7 +2320,7 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params co } createResp.Volume.ContentSource = contentSource } - return createResp, nil + return createResp } func getResourceId(resourceLink string) (string, error) { @@ -2056,7 +2352,7 @@ func getResourceId(resourceLink string) (string, error) { return strings.Join(elts[3:], "/"), nil } -func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) (*gce.CloudDisk, error) { +func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) (*gce.CloudDisk, error) { project := cloudProvider.GetDefaultProject() region, err := common.GetRegionFromZones(zones) if err != nil { @@ -2069,7 +2365,7 @@ func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name fullyQualifiedReplicaZones, cloudProvider.GetReplicaZoneURI(project, replicaZone)) } - err = cloudProvider.InsertDisk(ctx, project, meta.RegionalKey(name, region), params, capBytes, capacityRange, fullyQualifiedReplicaZones, snapshotID, volumeContentSourceVolumeID, multiWriter) + err = cloudProvider.InsertDisk(ctx, project, meta.RegionalKey(name, region), params, capBytes, capacityRange, fullyQualifiedReplicaZones, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode) if err != nil { return nil, fmt.Errorf("failed to insert regional disk: %w", err) } @@ -2086,13 +2382,13 @@ func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name return disk, nil } -func createSingleZoneDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) (*gce.CloudDisk, error) { +func createSingleZoneDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) (*gce.CloudDisk, error) { project := cloudProvider.GetDefaultProject() if len(zones) != 1 { return nil, fmt.Errorf("got wrong number of zones for zonal create volume: %v", len(zones)) } diskZone := zones[0] - err := cloudProvider.InsertDisk(ctx, project, meta.ZonalKey(name, diskZone), params, capBytes, capacityRange, nil, snapshotID, volumeContentSourceVolumeID, multiWriter) + err := cloudProvider.InsertDisk(ctx, project, meta.ZonalKey(name, diskZone), params, capBytes, capacityRange, nil, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode) if err != nil { return nil, fmt.Errorf("failed to insert zonal disk: %w", err) } diff --git a/pkg/gce-pd-csi-driver/controller_test.go b/pkg/gce-pd-csi-driver/controller_test.go index bee2e1d22..e196a1d3c 100644 --- a/pkg/gce-pd-csi-driver/controller_test.go +++ b/pkg/gce-pd-csi-driver/controller_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math/rand" + "net/http" "reflect" "sort" "strconv" @@ -30,11 +31,13 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" compute "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/flowcontrol" + "k8s.io/klog/v2" "k8s.io/utils/strings/slices" csi "github.com/container-storage-interface/spec/lib/go/csi" @@ -1131,6 +1134,583 @@ func TestCreateVolumeArguments(t *testing.T) { } } +func TestMultiZoneVolumeCreation(t *testing.T) { + testCases := []struct { + name string + req *csi.CreateVolumeRequest + enableStoragePools bool + fallbackZones []string + expZones []string + expErrCode codes.Code + }{ + { + name: "success single ROX multi-zone disk", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + VolumeContentSource: &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Snapshot{ + Snapshot: &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: testSnapshotID, + }, + }, + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + Preferred: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + }, + }, + expZones: []string{"us-central1-a"}, + }, + { + name: "single ROX multi-zone disk empty topology fallback zones", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + VolumeContentSource: &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Snapshot{ + Snapshot: &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: testSnapshotID, + }, + }, + }, + AccessibilityRequirements: &csi.TopologyRequirement{}, + }, + fallbackZones: []string{zone, secondZone}, + expZones: []string{zone, secondZone}, + }, + { + name: "success triple ROX multi-zone disk", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + VolumeContentSource: &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Snapshot{ + Snapshot: &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: testSnapshotID, + }, + }, + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + Preferred: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-b"}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-c"}, + }, + }, + }, + }, + expZones: []string{"us-central1-a", "us-central1-b", "us-central1-c"}, + }, + { + name: "success triple rwo multi-zone disk", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + Preferred: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-b"}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-c"}, + }, + }, + }, + }, + expZones: []string{"us-central1-a", "us-central1-b", "us-central1-c"}, + }, + { + name: "err single ROX multi-zone no topology", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + VolumeContentSource: &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Snapshot{ + Snapshot: &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: testSnapshotID, + }, + }, + }, + }, + expErrCode: codes.InvalidArgument, + }, + { + name: "err rwo access mode", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + VolumeContentSource: &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Snapshot{ + Snapshot: &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: testSnapshotID, + }, + }, + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + Preferred: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + }, + }, + expErrCode: codes.InvalidArgument, + }, + { + name: "err no content source", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + Preferred: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + }, + }, + expErrCode: codes.InvalidArgument, + }, + { + name: "err cloning not supported", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + VolumeContentSource: &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Volume{ + Volume: &csi.VolumeContentSource_VolumeSource{ + VolumeId: testVolumeID, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + Preferred: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + }, + }, + expErrCode: codes.InvalidArgument, + }, + } + + // Run test cases + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + // Setup new driver each time so no interference + fcp, err := gce.CreateFakeCloudProvider(project, zone, nil) + if err != nil { + t.Fatalf("Failed to create fake cloud provider: %v", err) + } + // Setup new driver each time so no interference + gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver.cs.multiZoneVolumeHandleConfig.DiskTypes = []string{"hyperdisk-ml"} + gceDriver.cs.multiZoneVolumeHandleConfig.Enable = true + gceDriver.cs.fallbackRequisiteZones = tc.fallbackZones + + if tc.req.VolumeContentSource.GetType() != nil { + snapshotParams, err := common.ExtractAndDefaultSnapshotParameters(nil, gceDriver.name, nil) + if err != nil { + t.Errorf("Got error extracting snapshot parameters: %v", err) + } + if snapshotParams.SnapshotType == common.DiskSnapshotType { + fcp.CreateSnapshot(context.Background(), project, meta.ZonalKey(name, common.MultiZoneValue), name, snapshotParams) + } else { + t.Fatalf("No volume source mentioned in snapshot parameters %v", snapshotParams) + } + } + + // Start Test + resp, err := gceDriver.cs.CreateVolume(context.Background(), tc.req) + if err != nil { + serverError, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from err: %v", serverError) + } + if serverError.Code() != tc.expErrCode { + t.Fatalf("Expected error code: %v, got: %v. err : %v", tc.expErrCode, serverError.Code(), err) + } + continue + } + if tc.expErrCode != codes.OK { + t.Fatalf("Expected error: %v, got no error", tc.expErrCode) + } + + topologies := make([]*csi.Topology, 0, len(tc.expZones)) + for _, zone := range tc.expZones { + topologies = append(topologies, &csi.Topology{ + Segments: map[string]string{common.TopologyKeyZone: zone}, + }) + } + + expVol := &csi.Volume{ + CapacityBytes: common.GbToBytes(20), + VolumeId: fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", project, name), + VolumeContext: nil, + AccessibleTopology: topologies, + ContentSource: tc.req.VolumeContentSource, + } + + // Make sure responses match + vol := resp.GetVolume() + if vol == nil { + // If one is nil but not both + t.Fatalf("Expected volume %v, got nil volume", expVol) + } + + klog.Warningf("Got accessible topology: %v", vol.GetAccessibleTopology()) + + sortTopologies := func(t1, t2 *csi.Topology) bool { + return t1.Segments[common.TopologyKeyZone] < t2.Segments[common.TopologyKeyZone] + } + if diff := cmp.Diff(expVol, vol, cmpopts.SortSlices(sortTopologies)); diff != "" { + t.Errorf("Accessible topologies mismatch (-want +got):\n%s", diff) + } + + for _, zone := range tc.expZones { + volumeKey := meta.ZonalKey(name, zone) + disk, err := fcp.GetDisk(context.Background(), project, volumeKey, gce.GCEAPIVersionBeta) + if err != nil { + t.Fatalf("Get Disk failed for created disk with error: %v", err) + } + if disk.GetLabels()[common.MultiZoneLabel] != "true" { + t.Fatalf("Expect %s disk to have %s label, got: %v", volumeKey, common.MultiZoneLabel, disk.GetLabels()) + } + } + } +} + +type FakeCloudProviderInsertDiskErr struct { + *gce.FakeCloudProvider + insertDiskErrors map[string]error +} + +func NewFakeCloudProviderInsertDiskErr(project, zone string) (*FakeCloudProviderInsertDiskErr, error) { + provider, err := gce.CreateFakeCloudProvider(project, zone, nil) + if err != nil { + return nil, err + } + return &FakeCloudProviderInsertDiskErr{ + FakeCloudProvider: provider, + insertDiskErrors: map[string]error{}, + }, nil +} + +func (cloud *FakeCloudProviderInsertDiskErr) AddDiskForErr(volKey *meta.Key, err error) { + cloud.insertDiskErrors[volKey.String()] = err +} + +func (cloud *FakeCloudProviderInsertDiskErr) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error { + if err, ok := cloud.insertDiskErrors[volKey.String()]; ok { + return err + } + + return cloud.FakeCloudProvider.InsertDisk(ctx, project, volKey, params, capBytes, capacityRange, replicaZones, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode) +} + +func TestMultiZoneVolumeCreationErrHandling(t *testing.T) { + testCases := []struct { + name string + req *csi.CreateVolumeRequest + insertDiskErrs map[*meta.Key]error + expErrCode codes.Code + wantDisks []*meta.Key + }{ + { + name: "ResourceExhausted errors", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-b"}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-c"}, + }, + }, + Preferred: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + }, + }, + insertDiskErrs: map[*meta.Key]error{ + meta.ZonalKey(name, "us-central1-b"): &googleapi.Error{Code: http.StatusTooManyRequests, Message: "Resource Exhausted"}, + }, + expErrCode: codes.ResourceExhausted, + wantDisks: []*meta.Key{ + meta.ZonalKey(name, "us-central1-a"), + meta.ZonalKey(name, "us-central1-c"), + }, + }, + { + name: "Unavailable errors", + req: &csi.CreateVolumeRequest{ + Name: "test-name", + CapacityRange: stdCapRange, + VolumeCapabilities: []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + Parameters: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + common.ParameterKeyEnableMultiZoneProvisioning: "true", + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-b"}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-c"}, + }, + }, + Preferred: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: "us-central1-a"}, + }, + }, + }, + }, + insertDiskErrs: map[*meta.Key]error{ + meta.ZonalKey(name, "us-central1-b"): &googleapi.Error{Code: http.StatusGatewayTimeout, Message: "connection reset by peer"}, + meta.ZonalKey(name, "us-central1-c"): &googleapi.Error{Code: http.StatusTooManyRequests, Message: "Resource Exhausted"}, + }, + expErrCode: codes.Unavailable, + wantDisks: []*meta.Key{ + meta.ZonalKey(name, "us-central1-a"), + }, + }, + } + + // Run test cases + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + // Setup new driver each time so no interference + fcp, err := NewFakeCloudProviderInsertDiskErr(project, zone) + if err != nil { + t.Fatalf("Failed to create fake cloud provider: %v", err) + } + // Setup new driver each time so no interference + gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver.cs.multiZoneVolumeHandleConfig.DiskTypes = []string{"hyperdisk-ml"} + gceDriver.cs.multiZoneVolumeHandleConfig.Enable = true + + for volKey, err := range tc.insertDiskErrs { + fcp.AddDiskForErr(volKey, err) + } + + // Start Test + _, err = gceDriver.cs.CreateVolume(context.Background(), tc.req) + + if err == nil { + t.Errorf("Expected error: %v, got no error", tc.expErrCode) + } + + serverError, ok := status.FromError(err) + if !ok { + t.Errorf("Could not get error status code from err: %v", serverError) + } + if serverError.Code() != tc.expErrCode { + t.Errorf("Expected error code: %v, got: %v. err : %v", tc.expErrCode, serverError.Code(), err) + } + + for _, volKey := range tc.wantDisks { + disk, err := fcp.GetDisk(context.Background(), project, volKey, gce.GCEAPIVersionV1) + if err != nil { + t.Errorf("Unexpected err fetching disk %v: %v", volKey, err) + } + if disk == nil { + t.Errorf("Expected disk for %v but got nil", volKey) + } + } + } +} + func TestListVolumePagination(t *testing.T) { testCases := []struct { name string @@ -2463,6 +3043,14 @@ func createZonalCloudDisk(name string) *gce.CloudDisk { }) } +func createZonalCloudDiskWithZone(name, zone string) *gce.CloudDisk { + return gce.CloudDiskFromV1(&compute.Disk{ + Name: name, + SelfLink: fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/project/zones/zone/name/%s", name), + Zone: zone, + }) +} + func TestDeleteVolume(t *testing.T) { testCases := []struct { name string @@ -2527,6 +3115,63 @@ func TestDeleteVolume(t *testing.T) { } } +func TestMultiZoneDeleteVolume(t *testing.T) { + testCases := []struct { + name string + seedDisks []*gce.CloudDisk + req *csi.DeleteVolumeRequest + expErr bool + }{ + { + name: "single-zone", + seedDisks: []*gce.CloudDisk{ + createZonalCloudDiskWithZone(name, zone), + }, + req: &csi.DeleteVolumeRequest{ + VolumeId: fmt.Sprintf("projects/%s/zones/%s/disks/%s", project, common.MultiZoneValue, name), + }, + }, + { + name: "multi-zone", + seedDisks: []*gce.CloudDisk{ + createZonalCloudDiskWithZone(name, zone), + createZonalCloudDiskWithZone(name, secondZone), + }, + req: &csi.DeleteVolumeRequest{ + VolumeId: fmt.Sprintf("projects/%s/zones/%s/disks/%s", project, common.MultiZoneValue, name), + }, + }, + } + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + // Setup new driver each time so no interference + fcp, err := gce.CreateFakeCloudProvider(project, zone, tc.seedDisks) + if err != nil { + t.Fatalf("Failed to create fake cloud provider: %v", err) + } + // Setup new driver each time so no interference + gceDriver := initGCEDriverWithCloudProvider(t, fcp) + gceDriver.cs.multiZoneVolumeHandleConfig.DiskTypes = []string{"hyperdisk-ml"} + gceDriver.cs.multiZoneVolumeHandleConfig.Enable = true + _, err = gceDriver.cs.DeleteVolume(context.Background(), tc.req) + if err == nil && tc.expErr { + t.Errorf("Expected error but got none") + } + if err != nil && !tc.expErr { + t.Errorf("Did not expect error but got: %v", err) + } + + disks, _, _ := fcp.ListDisks(context.TODO(), []googleapi.Field{}) + if len(disks) > 0 { + t.Errorf("Expected all disks to be deleted. Got: %v", disks) + } + + if err != nil { + continue + } + } +} + func TestGetRequestCapacity(t *testing.T) { testCases := []struct { name string diff --git a/test/e2e/tests/multi_zone_e2e_test.go b/test/e2e/tests/multi_zone_e2e_test.go index 81ee01c08..fb1ea32dd 100644 --- a/test/e2e/tests/multi_zone_e2e_test.go +++ b/test/e2e/tests/multi_zone_e2e_test.go @@ -16,8 +16,11 @@ package tests import ( "fmt" + "os" "path/filepath" + "strconv" "strings" + "time" csi "github.com/container-storage-interface/spec/lib/go/csi" . "github.com/onsi/ginkgo/v2" @@ -25,6 +28,7 @@ import ( compute "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" @@ -40,6 +44,28 @@ type verifyFunc func(*verifyArgs) error type detacherFunc func() +func runMultiZoneTests() bool { + runMultiZoneTestsStr, ok := os.LookupEnv("RUN_MULTI_ZONE_TESTS") + if !ok { + return false + } + + runMultiZoneTests, err := strconv.ParseBool(runMultiZoneTestsStr) + if err != nil { + return false + } + + return runMultiZoneTests +} + +func checkSkipMultiZoneTests() { + // TODO: Remove this once hyperdisk-ml SKU is supported + // If you want to run these tests, set the env variable: RUN_MULTI_ZONE_TESTS=true + if !runMultiZoneTests() { + Skip("Not running multi-zone tests, as RUN_MULTI_ZONE_TESTS is falsy") + } +} + var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { BeforeEach(func() { Expect(len(testContexts)).To(BeNumerically(">", 1)) @@ -67,6 +93,8 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { }) It("Should attach ROX 'multi-zone' PV instances to two separate VMs", func() { + checkSkipMultiZoneTests() + // Create new driver and client Expect(testContexts).NotTo(BeEmpty()) @@ -96,8 +124,8 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { // Create Disk volName := testNamePrefix + string(uuid.NewUUID()) - _, volID0 := createAndValidateZonalDisk(controllerClient, p, zones[0], standardDiskType, volName) - _, volID1 := createAndValidateZonalDisk(controllerClient, p, zones[1], standardDiskType, volName) + _, volID0 := createAndValidateZonalDisk(controllerClient, p, zones[0], "hyperdisk-ml", volName) + _, volID1 := createAndValidateZonalDisk(controllerClient, p, zones[1], "hyperdisk-ml", volName) labelsMap := map[string]string{ common.MultiZoneLabel: "true", @@ -156,6 +184,650 @@ var _ = Describe("GCE PD CSI Driver Multi-Zone", func() { err = controllerClient.ControllerUnpublishVolume(volID, nodeID1) Expect(err).To(BeNil(), "Failed to detach vol2") + }) + + It("Should create RWO 'multi-zone' PV instances from a previously created disk", func() { + checkSkipMultiZoneTests() + + // Create new driver and client + + Expect(testContexts).NotTo(BeEmpty()) + + zoneToContext := map[string]*remote.TestContext{} + zones := []string{} + + for _, tc := range testContexts { + _, z, _ := tc.Instance.GetIdentity() + // Zone hasn't been seen before + if _, ok := zoneToContext[z]; !ok { + zoneToContext[z] = tc + zones = append(zones, z) + } + if len(zoneToContext) == 2 { + break + } + } + + Expect(len(zoneToContext)).To(Equal(2), "Must have instances in 2 zones") + + controllerContext := zoneToContext[zones[0]] + controllerClient := controllerContext.Client + controllerInstance := controllerContext.Instance + + p, _, _ := controllerInstance.GetIdentity() + + // Create Disk + volName := testNamePrefix + string(uuid.NewUUID()) + _, volID0 := createAndValidateZonalDisk(controllerClient, p, zones[0], "hyperdisk-ml", volName) + + labelsMap := map[string]string{ + common.MultiZoneLabel: "true", + } + disk1, err := computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Could not get disk") + disk1Op, err := computeService.Disks.SetLabels(p, zones[0], volName, &compute.ZoneSetLabelsRequest{ + LabelFingerprint: disk1.LabelFingerprint, + Labels: labelsMap, + }).Do() + Expect(err).To(BeNil(), "Could not set disk labels") + _, err = computeService.ZoneOperations.Wait(p, zones[0], disk1Op.Name).Do() + Expect(err).To(BeNil(), "Could not set disk labels") + + defer deleteDisk(controllerClient, p, zones[0], volID0, volName) + + // Create multi-zone Disk + resp, err := controllerClient.CreateVolumeWithCaps(volName, map[string]string{ + common.ParameterKeyEnableMultiZoneProvisioning: "true", + common.ParameterKeyType: "hyperdisk-ml", + }, defaultHdmlSizeGb, + &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: zones[1]}, + }, + }, + }, + []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + nil) + Expect(err).To(BeNil(), "Error creating multi-zone volume") + topology := resp.GetAccessibleTopology() + Expect(len(topology)).To(Equal(2)) + gotZones := []string{topology[0].Segments[common.TopologyKeyZone], topology[1].Segments[common.TopologyKeyZone]} + Expect(gotZones).To(ConsistOf(zones[0], zones[1])) + + volID := fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", p, volName) + defer func() { + // Delete Disk + err := controllerClient.DeleteVolume(volID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + _, err = computeService.Disks.Get(p, zones[1], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + }() + + disk1, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[0], volName) + disk2, err := computeService.Disks.Get(p, zones[1], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[1], volName) + + // Validate disks are RWO + Expect(disk1.AccessMode).To(Equal("READ_WRITE_SINGLE")) + Expect(disk2.AccessMode).To(Equal("READ_WRITE_SINGLE")) + }) + + It("Should create ROX 'multi-zone' PV from existing snapshot", func() { + checkSkipMultiZoneTests() + Expect(testContexts).NotTo(BeEmpty()) + + zoneToContext := map[string]*remote.TestContext{} + zones := []string{} + + for _, tc := range testContexts { + _, z, _ := tc.Instance.GetIdentity() + // Zone hasn't been seen before + if _, ok := zoneToContext[z]; !ok { + zoneToContext[z] = tc + zones = append(zones, z) + } + if len(zoneToContext) == 2 { + break + } + } + + Expect(len(zoneToContext)).To(Equal(2), "Must have instances in 2 zones") + + controllerContext := zoneToContext[zones[0]] + controllerClient := controllerContext.Client + controllerInstance := controllerContext.Instance + + p, _, _ := controllerInstance.GetIdentity() + + tc0 := zoneToContext[zones[0]] + tc1 := zoneToContext[zones[1]] + + snapshotVolName, snapshotVolID := createAndValidateUniqueZonalDisk(controllerClient, p, zones[0], standardDiskType) + + underSpecifiedID := common.GenerateUnderspecifiedVolumeID(snapshotVolName, true /* isZonal */) + + defer func() { + // Delete Disk + err := controllerClient.DeleteVolume(underSpecifiedID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, zones[0], snapshotVolName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found") + }() + + // Attach Disk + err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */) + Expect(err).To(BeNil(), "Failed to go through volume lifecycle") + + // Create Snapshot + snapshotName := testNamePrefix + string(uuid.NewUUID()) + snapshotID, err := controllerClient.CreateSnapshot(snapshotName, snapshotVolID, nil) + Expect(err).To(BeNil(), "CreateSnapshot failed with error: %v", err) + + // Validate Snapshot Created + snapshot, err := computeService.Snapshots.Get(p, snapshotName).Do() + Expect(err).To(BeNil(), "Could not get snapshot from cloud directly") + Expect(snapshot.Name).To(Equal(snapshotName)) + + err = wait.Poll(10*time.Second, 3*time.Minute, func() (bool, error) { + snapshot, err := computeService.Snapshots.Get(p, snapshotName).Do() + Expect(err).To(BeNil(), "Could not get snapshot from cloud directly") + if snapshot.Status == "READY" { + return true, nil + } + return false, nil + }) + Expect(err).To(BeNil(), "Could not wait for snapshot be ready") + + // Create multi-zone Disk + volName := testNamePrefix + string(uuid.NewUUID()) + _, err = controllerClient.CreateVolumeWithCaps(volName, map[string]string{ + common.ParameterKeyEnableMultiZoneProvisioning: "true", + common.ParameterKeyType: "hyperdisk-ml", + }, defaultHdmlSizeGb, + &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: zones[0]}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: zones[1]}, + }, + }, + }, + []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Snapshot{ + Snapshot: &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: snapshotID, + }, + }, + }) + Expect(err).To(BeNil(), "CreateVolume failed with error: %v", err) + + volID := fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", p, volName) + defer func() { + // Delete Disk + err := controllerClient.DeleteVolume(volID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + _, err = computeService.Disks.Get(p, zones[1], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + }() + + disk1, err := computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[0], volName) + disk2, err := computeService.Disks.Get(p, zones[1], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[1], volName) + + // Validate disks are ROX + Expect(disk1.AccessMode).To(Equal("READ_ONLY_MANY")) + Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) + + // Attach Disk to node1 and validate contents + err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/read/detach on vol1") + + // Attach Disk to node1 and validate contents + err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/read/detach on vol2") + + disk1, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[0], volName) + disk2, err = computeService.Disks.Get(p, zones[1], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[1], volName) + + // Validate disks have multi-zone labels + Expect(disk1.Labels[common.MultiZoneLabel]).To(Equal("true")) + Expect(disk2.Labels[common.MultiZoneLabel]).To(Equal("true")) + + // Validate disks are ROX + Expect(disk1.AccessMode).To(Equal("READ_ONLY_MANY")) + Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) + }) + + It("Should create ROX 'multi-zone' PV from existing snapshot with no topology", func() { + checkSkipMultiZoneTests() + Expect(testContexts).NotTo(BeEmpty()) + + zoneToContext := map[string]*remote.TestContext{} + zones := []string{} + + for _, tc := range testContexts { + _, z, _ := tc.Instance.GetIdentity() + // Zone hasn't been seen before + if _, ok := zoneToContext[z]; !ok { + zoneToContext[z] = tc + zones = append(zones, z) + } + if len(zoneToContext) == 2 { + break + } + } + + Expect(len(zoneToContext)).To(Equal(2), "Must have instances in 2 zones") + + controllerContext := zoneToContext[zones[0]] + controllerClient := controllerContext.Client + controllerInstance := controllerContext.Instance + + p, _, _ := controllerInstance.GetIdentity() + + tc0 := zoneToContext[zones[0]] + tc1 := zoneToContext[zones[1]] + + snapshotVolName, snapshotVolID := createAndValidateUniqueZonalDisk(controllerClient, p, zones[0], standardDiskType) + + underSpecifiedID := common.GenerateUnderspecifiedVolumeID(snapshotVolName, true /* isZonal */) + + defer func() { + // Delete Disk + err := controllerClient.DeleteVolume(underSpecifiedID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, zones[0], snapshotVolName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found") + }() + + // Attach Disk + err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */) + Expect(err).To(BeNil(), "Failed to go through volume lifecycle") + + // Create Snapshot + snapshotName := testNamePrefix + string(uuid.NewUUID()) + snapshotID, err := controllerClient.CreateSnapshot(snapshotName, snapshotVolID, nil) + Expect(err).To(BeNil(), "CreateSnapshot failed with error: %v", err) + + // Validate Snapshot Created + snapshot, err := computeService.Snapshots.Get(p, snapshotName).Do() + Expect(err).To(BeNil(), "Could not get snapshot from cloud directly") + Expect(snapshot.Name).To(Equal(snapshotName)) + + err = wait.Poll(10*time.Second, 3*time.Minute, func() (bool, error) { + snapshot, err := computeService.Snapshots.Get(p, snapshotName).Do() + Expect(err).To(BeNil(), "Could not get snapshot from cloud directly") + if snapshot.Status == "READY" { + return true, nil + } + return false, nil + }) + Expect(err).To(BeNil(), "Could not wait for snapshot be ready") + + // Create multi-zone Disk + volName := testNamePrefix + string(uuid.NewUUID()) + _, err = controllerClient.CreateVolumeWithCaps(volName, map[string]string{ + common.ParameterKeyEnableMultiZoneProvisioning: "true", + common.ParameterKeyType: "hyperdisk-ml", + }, defaultHdmlSizeGb, + &csi.TopologyRequirement{}, + []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Snapshot{ + Snapshot: &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: snapshotID, + }, + }, + }) + Expect(err).To(BeNil(), "CreateVolume failed with error: %v", err) + + volID := fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", p, volName) + defer func() { + // Delete Disk + err := controllerClient.DeleteVolume(volID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + _, err = computeService.Disks.Get(p, zones[1], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + }() + + disk1, err := computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[0], volName) + disk2, err := computeService.Disks.Get(p, zones[1], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[1], volName) + + // Validate disks are ROX + Expect(disk1.AccessMode).To(Equal("READ_ONLY_MANY")) + Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) + + // Attach Disk to node1 and validate contents + err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/read/detach on vol1") + + // Attach Disk to node1 and validate contents + err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/read/detach on vol2") + + disk1, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[0], volName) + disk2, err = computeService.Disks.Get(p, zones[1], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[1], volName) + + // Validate disks have multi-zone labels + Expect(disk1.Labels[common.MultiZoneLabel]).To(Equal("true")) + Expect(disk2.Labels[common.MultiZoneLabel]).To(Equal("true")) + + // Validate disks are ROX + Expect(disk1.AccessMode).To(Equal("READ_ONLY_MANY")) + Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) + }) + + It("Should create ROX 'multi-zone' PV from existing disk image", func() { + checkSkipMultiZoneTests() + Expect(testContexts).NotTo(BeEmpty()) + + zoneToContext := map[string]*remote.TestContext{} + zones := []string{} + + for _, tc := range testContexts { + _, z, _ := tc.Instance.GetIdentity() + // Zone hasn't been seen before + if _, ok := zoneToContext[z]; !ok { + zoneToContext[z] = tc + zones = append(zones, z) + } + if len(zoneToContext) == 2 { + break + } + } + + Expect(len(zoneToContext)).To(Equal(2), "Must have instances in 2 zones") + + controllerContext := zoneToContext[zones[0]] + controllerClient := controllerContext.Client + controllerInstance := controllerContext.Instance + + p, _, _ := controllerInstance.GetIdentity() + + tc0 := zoneToContext[zones[0]] + tc1 := zoneToContext[zones[1]] + + snapshotVolName, snapshotVolID := createAndValidateUniqueZonalDisk(controllerClient, p, zones[0], standardDiskType) + + underSpecifiedID := common.GenerateUnderspecifiedVolumeID(snapshotVolName, true /* isZonal */) + + defer func() { + // Delete Disk + err := controllerClient.DeleteVolume(underSpecifiedID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, zones[0], snapshotVolName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found") + }() + + // Attach Disk + err := testAttachWriteReadDetach(underSpecifiedID, snapshotVolName, tc0.Instance, controllerClient, false /* readOnly */) + Expect(err).To(BeNil(), "Failed to go through volume lifecycle") + + // Create Disk Image + imageName := testNamePrefix + string(uuid.NewUUID()) + snapshotParams := map[string]string{ + "snapshot-type": "images", + } + snapshotID, err := controllerClient.CreateSnapshot(imageName, snapshotVolID, snapshotParams) + klog.Infof("Created image snapshot with snapshotID: %s", snapshotID) + Expect(err).To(BeNil(), "CreateSnapshot failed with error: %v", err) + + // Validate Disk Image Created + image, err := computeService.Images.Get(p, imageName).Do() + Expect(err).To(BeNil(), "Could not get disk image from cloud directly") + Expect(image.Name).To(Equal(imageName)) + + err = wait.Poll(10*time.Second, 3*time.Minute, func() (bool, error) { + image, err := computeService.Images.Get(p, imageName).Do() + Expect(err).To(BeNil(), "Could not get disk image from cloud directly") + if image.Status == "READY" { + return true, nil + } + return false, nil + }) + Expect(err).To(BeNil(), "Could not wait for disk image be ready") + + // Create multi-zone Disk + volName := testNamePrefix + string(uuid.NewUUID()) + + _, err = controllerClient.CreateVolumeWithCaps(volName, map[string]string{ + common.ParameterKeyEnableMultiZoneProvisioning: "true", + common.ParameterKeyType: "hyperdisk-ml", + }, defaultHdmlSizeGb, + &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: zones[0]}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: zones[1]}, + }, + }, + }, + []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + }, + }, + &csi.VolumeContentSource{ + Type: &csi.VolumeContentSource_Snapshot{ + Snapshot: &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: snapshotID, + }, + }, + }) + Expect(err).To(BeNil(), "CreateVolume failed with error: %v", err) + + volID := fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", p, volName) + defer func() { + // Delete Disk + err := controllerClient.DeleteVolume(volID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + _, err = computeService.Disks.Get(p, zones[1], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + }() + + disk1, err := computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[0], volName) + disk2, err := computeService.Disks.Get(p, zones[1], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[1], volName) + + // Validate disks have multi-zone labels + Expect(disk1.Labels[common.MultiZoneLabel]).To(Equal("true")) + Expect(disk2.Labels[common.MultiZoneLabel]).To(Equal("true")) + + // Validate disks are ROX + Expect(disk1.AccessMode).To(Equal("READ_ONLY_MANY")) + Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) + + // Attach Disk to node1 + err = testAttachWriteReadDetach(volID, volName, tc0.Instance, tc0.Client, true /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/read/detach on vol1") + + // Attach Disk to node1 + err = testAttachWriteReadDetach(volID, volName, tc1.Instance, tc1.Client, true /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/read/detach on vol2") + }) + + It("Should create RWO 'multi-zone' PV that has empty disks", func() { + checkSkipMultiZoneTests() + // Create new driver and client + Expect(testContexts).NotTo(BeEmpty()) + + zoneToContext := map[string]*remote.TestContext{} + zones := []string{} + + for _, tc := range testContexts { + _, z, _ := tc.Instance.GetIdentity() + // Zone hasn't been seen before + if _, ok := zoneToContext[z]; !ok { + zoneToContext[z] = tc + zones = append(zones, z) + } + if len(zoneToContext) == 2 { + break + } + } + + Expect(len(zoneToContext)).To(Equal(2), "Must have instances in 2 zones") + + controllerContext := zoneToContext[zones[0]] + controllerClient := controllerContext.Client + controllerInstance := controllerContext.Instance + + p, _, _ := controllerInstance.GetIdentity() + + // Attach disk to instance in the first zone. + tc0 := zoneToContext[zones[0]] + tc1 := zoneToContext[zones[1]] + + // Create Disk + volName := testNamePrefix + string(uuid.NewUUID()) + _, err := controllerClient.CreateVolumeWithCaps(volName, map[string]string{ + common.ParameterKeyEnableMultiZoneProvisioning: "true", + common.ParameterKeyType: "hyperdisk-ml", + }, defaultHdmlSizeGb, + &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + { + Segments: map[string]string{common.TopologyKeyZone: zones[0]}, + }, + { + Segments: map[string]string{common.TopologyKeyZone: zones[1]}, + }, + }, + }, + []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + }, + nil) + Expect(err).To(BeNil(), "CreateVolume failed with error: %v", err) + + volID := fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", p, volName) + defer func() { + // Delete Disk + err := controllerClient.DeleteVolume(volID) + Expect(err).To(BeNil(), "DeleteVolume failed") + + // Validate Disk Deleted + _, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + _, err = computeService.Disks.Get(p, zones[1], volName).Do() + Expect(gce.IsGCEError(err, "notFound")).To(BeTrue(), "Expected disk to not be found. Err: %v", err) + }() + + disk1, err := computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[0], volName) + disk2, err := computeService.Disks.Get(p, zones[1], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[1], volName) + + // Validate disks have multi-zone labels + Expect(disk1.Labels[common.MultiZoneLabel]).To(Equal("true")) + Expect(disk2.Labels[common.MultiZoneLabel]).To(Equal("true")) + + // Validate disks are RWO + Expect(disk1.AccessMode).To(Equal("READ_WRITE_SINGLE")) + Expect(disk2.AccessMode).To(Equal("READ_WRITE_SINGLE")) + + // Validate underlying disks can be used + volID0 := fmt.Sprintf("projects/%s/zones/%s/disks/%s", p, zones[0], volName) + volID1 := fmt.Sprintf("projects/%s/zones/%s/disks/%s", p, zones[1], volName) + + err = testAttachWriteReadDetach(volID0, volName, tc0.Instance, tc0.Client, false /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/write/read/detach on vol1") + + err = testAttachWriteReadDetach(volID1, volName, tc1.Instance, tc1.Client, false /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/write/read/detach on vol2") + + // Validate disks can be used in multi-zone mode on both nodes + volIDMultiZone := fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", p, volName) + err = testAttachWriteReadDetach(volIDMultiZone, volName, tc0.Instance, tc0.Client, true /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/read/detach on vol1") + + err = testAttachWriteReadDetach(volIDMultiZone, volName, tc1.Instance, tc1.Client, true /* readonly */) + Expect(err).To(BeNil(), "Failed to attach/read/detach on vol2") + + // Validate disks are ROX now + disk1, err = computeService.Disks.Get(p, zones[0], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[0], volName) + disk2, err = computeService.Disks.Get(p, zones[1], volName).Do() + Expect(err).To(BeNil(), "Failed to get disk %v/%v", zones[1], volName) + + Expect(disk1.AccessMode).To(Equal("READ_ONLY_MANY")) + Expect(disk2.AccessMode).To(Equal("READ_ONLY_MANY")) }) diff --git a/test/e2e/tests/setup_e2e_test.go b/test/e2e/tests/setup_e2e_test.go index 0fd6f1a7e..7fd12a52d 100644 --- a/test/e2e/tests/setup_e2e_test.go +++ b/test/e2e/tests/setup_e2e_test.go @@ -126,6 +126,7 @@ func notEmpty(v string) bool { func getDriverConfig() testutils.DriverConfig { return testutils.DriverConfig{ ExtraFlags: slices.Filter(nil, strings.Split(*extraDriverFlags, ","), notEmpty), + Zones: strings.Split(*zones, ","), } } diff --git a/test/e2e/tests/single_zone_e2e_test.go b/test/e2e/tests/single_zone_e2e_test.go index 980bb0f73..5bb989441 100644 --- a/test/e2e/tests/single_zone_e2e_test.go +++ b/test/e2e/tests/single_zone_e2e_test.go @@ -47,6 +47,7 @@ const ( defaultSizeGb int64 = 5 defaultExtremeSizeGb int64 = 500 defaultHdTSizeGb int64 = 2048 + defaultHdmlSizeGb int64 = 200 defaultRepdSizeGb int64 = 200 defaultMwSizeGb int64 = 200 defaultVolumeLimit int64 = 127 @@ -56,6 +57,7 @@ const ( ssdDiskType = "pd-ssd" extremeDiskType = "pd-extreme" hdtDiskType = "hyperdisk-throughput" + hdmlDiskType = "hyperdisk-ml" provisionedIOPSOnCreate = "12345" provisionedIOPSOnCreateInt = int64(12345) provisionedIOPSOnCreateDefaultInt = int64(100000) @@ -1579,6 +1581,8 @@ func createAndValidateZonalDisk(client *remote.CsiClient, project, zone string, diskSize = defaultExtremeSizeGb case hdtDiskType: diskSize = defaultHdTSizeGb + case hdmlDiskType: + diskSize = defaultHdmlSizeGb } volume, err := client.CreateVolume(volName, disk.params, diskSize, &csi.TopologyRequirement{ @@ -1759,6 +1763,14 @@ var typeToDisk = map[string]*disk{ Expect(disk.Type).To(ContainSubstring(ssdDiskType)) }, }, + "hyperdisk-ml": { + params: map[string]string{ + common.ParameterKeyType: "hyperdisk-ml", + }, + validate: func(disk *compute.Disk) { + Expect(disk.Type).To(ContainSubstring("hyperdisk-ml")) + }, + }, } func merge(a, b map[string]string) map[string]string { diff --git a/test/e2e/utils/utils.go b/test/e2e/utils/utils.go index 3d4a2df81..995b114b0 100644 --- a/test/e2e/utils/utils.go +++ b/test/e2e/utils/utils.go @@ -46,6 +46,7 @@ var ( type DriverConfig struct { ComputeEndpoint string ExtraFlags []string + Zones []string } func GCEClientAndDriverSetup(instance *remote.InstanceInfo, driverConfig DriverConfig) (*remote.TestContext, error) { @@ -62,9 +63,10 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, driverConfig DriverC fmt.Sprintf("--extra-labels=%s=%s", DiskLabelKey, DiskLabelValue), "--max-concurrent-format-and-mount=20", // otherwise the serialization times out the e2e test. "--multi-zone-volume-handle-enable", - "--multi-zone-volume-handle-disk-types=pd-standard", + "--multi-zone-volume-handle-disk-types=pd-standard,hyperdisk-ml", "--use-instance-api-to-poll-attachment-disk-types=pd-ssd", "--use-instance-api-to-list-volumes-published-nodes", + fmt.Sprintf("--fallback-requisite-zones=%s", strings.Join(driverConfig.Zones, ",")), } extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint=%s", driverConfig.ComputeEndpoint)) extra_flags = append(extra_flags, driverConfig.ExtraFlags...)