Skip to content

Commit 8da30d8

Browse files
committed
Add support for a multi-zone volumeHandle
1 parent e75b9a3 commit 8da30d8

File tree

10 files changed

+215
-83
lines changed

10 files changed

+215
-83
lines changed

pkg/common/constants.go

+8
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,12 @@ const (
2424
VolumeAttributePartition = "partition"
2525

2626
UnspecifiedValue = "UNSPECIFIED"
27+
28+
// Keyword indicating a 'multi-zone' volumeHandle. Replaces "zones" in the volumeHandle:
29+
// eg: projects/{project}/zones/multi-zone/disks/{name} vs.
30+
// projects/{project}/zones/{zone}/disks/{name}
31+
MultiZoneValue = "multi-zone"
32+
33+
// Label that is set on a disk when it is used by a 'multi-zone' VolumeHandle
34+
MultiZoneLabel = "goog-gke-multi-zone"
2735
)

pkg/common/utils.go

+12
Original file line numberDiff line numberDiff line change
@@ -478,3 +478,15 @@ func UnorderedSlicesEqual(slice1 []string, slice2 []string) bool {
478478
}
479479
return true
480480
}
481+
482+
func VolumeIdAsMultiZone(volumeId string) (string, error) {
483+
splitId := strings.Split(volumeId, "/")
484+
if len(splitId) != volIDTotalElements {
485+
return "", fmt.Errorf("failed to get id components. Expected projects/{project}/zones/{zone}/disks/{name}. Got: %s", volumeId)
486+
}
487+
if splitId[volIDToplogyKey] != "zones" {
488+
return "", fmt.Errorf("expected id to be zonal. Got: %s", volumeId)
489+
}
490+
splitId[volIDToplogyValue] = MultiZoneValue
491+
return strings.Join(splitId, "/"), nil
492+
}

pkg/gce-cloud-provider/compute/fake-gce.go

+4
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func (cloud *FakeCloudProvider) GetDefaultZone() string {
9292
return cloud.zone
9393
}
9494

95+
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKeyForZone(ctx context.Context, project string, volumeKey *meta.Key, instanceZone string) (string, *meta.Key, error) {
96+
return cloud.RepairUnderspecifiedVolumeKey(ctx, project, volumeKey)
97+
}
98+
9599
func (cloud *FakeCloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error) {
96100
if project == common.UnspecifiedValue {
97101
project = cloud.project

pkg/gce-cloud-provider/compute/gce-compute.go

+12
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type GCECompute interface {
9191
GetDefaultZone() string
9292
// Disk Methods
9393
GetDisk(ctx context.Context, project string, volumeKey *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error)
94+
RepairUnderspecifiedVolumeKeyForZone(ctx context.Context, project string, volumeKey *meta.Key, instanceZone string) (string, *meta.Key, error)
9495
RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error)
9596
ValidateExistingDisk(ctx context.Context, disk *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error
9697
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) error
@@ -171,6 +172,17 @@ func (cloud *CloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, s
171172
return items, "", nil
172173
}
173174

175+
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
176+
// by the volume key and return a volume key with a correct zone
177+
func (cloud *CloudProvider) RepairUnderspecifiedVolumeKeyForZone(ctx context.Context, project string, volumeKey *meta.Key, instanceZone string) (string, *meta.Key, error) {
178+
if volumeKey.Type() == meta.Zonal && volumeKey.Zone == common.MultiZoneValue {
179+
// Multi-zone support
180+
// Use the node's zone.
181+
volumeKey.Zone = instanceZone
182+
}
183+
return cloud.RepairUnderspecifiedVolumeKey(ctx, project, volumeKey)
184+
}
185+
174186
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
175187
// by the volume key and return a volume key with a correct zone
176188
func (cloud *CloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error) {

pkg/gce-pd-csi-driver/controller.go

+90-28
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ type GCEControllerServer struct {
4747
CloudProvider gce.GCECompute
4848
Metrics metrics.MetricsManager
4949

50-
disks []*compute.Disk
51-
seen map[string]int
50+
volumeEntries []*csi.ListVolumesResponse_Entry
51+
volumeEntriesSeen map[string]int
5252

5353
snapshots []*csi.ListSnapshotsResponse_Entry
5454
snapshotTokens map[string]int
@@ -603,14 +603,24 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
603603
PublishContext: nil,
604604
}
605605

606-
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
606+
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
607+
if err != nil {
608+
return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()), nil
609+
}
610+
611+
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKeyForZone(ctx, project, volKey, instanceZone)
607612
if err != nil {
608613
if gce.IsGCENotFoundError(err) {
609614
return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume could not find volume with ID %v: %v", volumeID, err.Error()), nil
610615
}
611616
return nil, common.LoggedError("ControllerPublishVolume error repairing underspecified volume key: ", err), nil
612617
}
613618

619+
// Only allow read-only attachment for "multi-zone" volumes
620+
if volKey.Type() == meta.Zonal && volKey.Zone == common.MultiZoneValue && !readOnly {
621+
return nil, status.Errorf(codes.InvalidArgument, "'multi-zone' volume %v only supports READ_ONLY: %v", volumeID, err.Error()), nil
622+
}
623+
614624
// Acquires the lock for the volume on that node only, because we need to support the ability
615625
// to publish the same volume onto different nodes concurrently
616626
lockingVolumeID := fmt.Sprintf("%s/%s", nodeID, volumeID)
@@ -625,10 +635,6 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
625635
}
626636
return nil, common.LoggedError("Failed to getDisk: ", err), disk
627637
}
628-
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
629-
if err != nil {
630-
return nil, status.Errorf(codes.NotFound, "could not split nodeID: %v", err.Error()), disk
631-
}
632638
instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName)
633639
if err != nil {
634640
if gce.IsGCENotFoundError(err) {
@@ -739,7 +745,13 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
739745

740746
volumeID := req.GetVolumeId()
741747
nodeID := req.GetNodeId()
742-
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
748+
749+
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
750+
if err != nil {
751+
return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), nil
752+
}
753+
754+
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKeyForZone(ctx, project, volKey, instanceZone)
743755
if err != nil {
744756
if gce.IsGCENotFoundError(err) {
745757
klog.Warningf("Treating volume %v as unpublished because it could not be found", volumeID)
@@ -756,10 +768,6 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
756768
}
757769
defer gceCS.volumeLocks.Release(lockingVolumeID)
758770
diskToUnpublish, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
759-
instanceZone, instanceName, err := common.NodeIDToZoneAndName(nodeID)
760-
if err != nil {
761-
return nil, status.Errorf(codes.InvalidArgument, "could not split nodeID: %v", err.Error()), diskToUnpublish
762-
}
763771
instance, err := gceCS.CloudProvider.GetInstanceOrError(ctx, instanceZone, instanceName)
764772
if err != nil {
765773
if gce.IsGCENotFoundError(err) {
@@ -810,6 +818,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
810818
if err != nil {
811819
return nil, status.Errorf(codes.InvalidArgument, "Volume ID is invalid: %v", err.Error())
812820
}
821+
813822
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
814823
if err != nil {
815824
if gce.IsGCENotFoundError(err) {
@@ -879,8 +888,9 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
879888
"ListVolumes got max entries request %v. GCE only supports values >0", req.MaxEntries)
880889
}
881890

882-
offset := 0
891+
offsetLow := 0
883892
var ok bool
893+
var volumeEntries []*csi.ListVolumesResponse_Entry
884894
if req.StartingToken == "" {
885895
diskList, _, err := gceCS.CloudProvider.ListDisks(ctx)
886896
if err != nil {
@@ -889,10 +899,14 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
889899
}
890900
return nil, common.LoggedError("Failed to list disk: ", err)
891901
}
892-
gceCS.disks = diskList
893-
gceCS.seen = map[string]int{}
902+
volumeEntries = disksToVolumeEntries(diskList)
903+
}
904+
905+
if req.StartingToken == "" {
906+
gceCS.volumeEntries = volumeEntries
907+
gceCS.volumeEntriesSeen = map[string]int{}
894908
} else {
895-
offset, ok = gceCS.seen[req.StartingToken]
909+
offsetLow, ok = gceCS.volumeEntriesSeen[req.StartingToken]
896910
if !ok {
897911
return nil, status.Errorf(codes.Aborted, "ListVolumes error with invalid startingToken: %s", req.StartingToken)
898912
}
@@ -903,9 +917,50 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
903917
maxEntries = maxListVolumesResponseEntries
904918
}
905919

920+
nextToken := ""
921+
offsetHigh := offsetLow + maxEntries
922+
if offsetHigh < len(gceCS.volumeEntries) {
923+
nextToken = string(uuid.NewUUID())
924+
gceCS.volumeEntriesSeen[nextToken] = offsetHigh
925+
} else {
926+
offsetHigh = len(gceCS.volumeEntries)
927+
}
928+
929+
return &csi.ListVolumesResponse{
930+
Entries: gceCS.volumeEntries[offsetLow:offsetHigh],
931+
NextToken: nextToken,
932+
}, nil
933+
}
934+
935+
// isMultiZoneDisk returns the multi-zone volumeId of a disk if it is
936+
// "multi-zone", otherwise returns an empty string
937+
// The second parameter indiciates if it is a "multi-zone" disk
938+
func isMultiZoneDisk(diskRsrc string, diskLabels map[string]string) (string, bool) {
939+
isMultiZoneDisk := false
940+
for l := range diskLabels {
941+
if l == common.MultiZoneLabel {
942+
isMultiZoneDisk = true
943+
}
944+
}
945+
if !isMultiZoneDisk {
946+
return "", false
947+
}
948+
949+
multiZoneVolumeId, err := common.VolumeIdAsMultiZone(diskRsrc)
950+
if err != nil {
951+
klog.Warningf("Error converting multi-zone volume handle for disk %s, skipped: %v", diskRsrc, err)
952+
return "", false
953+
}
954+
return multiZoneVolumeId, true
955+
}
956+
957+
// disksToVolumeEntries converts a list of disks to a list of CSI ListVolumeResponse entries
958+
// It appends "multi-zone" volumeHandles at the end. These are volumeHandles which
959+
// map to multiple volumeHandles in different zones
960+
func disksToVolumeEntries(disks []*compute.Disk) []*csi.ListVolumesResponse_Entry {
961+
multiZoneNodesByVolumeId := map[string][]string{}
906962
entries := []*csi.ListVolumesResponse_Entry{}
907-
for i := 0; i+offset < len(gceCS.disks) && i < maxEntries; i++ {
908-
d := gceCS.disks[i+offset]
963+
for _, d := range disks {
909964
diskRsrc, err := getResourceId(d.SelfLink)
910965
if err != nil {
911966
klog.Warningf("Bad ListVolumes disk resource %s, skipped: %v (%+v)", d.SelfLink, err, d)
@@ -920,6 +975,13 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
920975
users = append(users, rsrc)
921976
}
922977
}
978+
if multiZoneVolumeId, isMultiZone := isMultiZoneDisk(diskRsrc, d.Labels); isMultiZone {
979+
_, ok := multiZoneNodesByVolumeId[multiZoneVolumeId]
980+
if !ok {
981+
multiZoneNodesByVolumeId[multiZoneVolumeId] = []string{}
982+
}
983+
multiZoneNodesByVolumeId[multiZoneVolumeId] = append(multiZoneNodesByVolumeId[multiZoneVolumeId], users...)
984+
}
923985
entries = append(entries, &csi.ListVolumesResponse_Entry{
924986
Volume: &csi.Volume{
925987
VolumeId: diskRsrc,
@@ -929,17 +991,17 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
929991
},
930992
})
931993
}
932-
933-
nextToken := ""
934-
if len(entries)+offset < len(gceCS.disks) {
935-
nextToken = string(uuid.NewUUID())
936-
gceCS.seen[nextToken] = len(entries) + offset
994+
for volumeId, nodeIds := range multiZoneNodesByVolumeId {
995+
entries = append(entries, &csi.ListVolumesResponse_Entry{
996+
Volume: &csi.Volume{
997+
VolumeId: volumeId,
998+
},
999+
Status: &csi.ListVolumesResponse_VolumeStatus{
1000+
PublishedNodeIds: nodeIds,
1001+
},
1002+
})
9371003
}
938-
939-
return &csi.ListVolumesResponse{
940-
Entries: entries,
941-
NextToken: nextToken,
942-
}, nil
1004+
return entries
9431005
}
9441006

9451007
func (gceCS *GCEControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {

pkg/gce-pd-csi-driver/controller_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ var (
6767

6868
testVolumeID = fmt.Sprintf("projects/%s/zones/%s/disks/%s", project, zone, name)
6969
underspecifiedVolumeID = fmt.Sprintf("projects/UNSPECIFIED/zones/UNSPECIFIED/disks/%s", name)
70+
multiZoneVolumeID = fmt.Sprintf("projects/%s/zones/multi-zone/disks/%s", project, name)
7071

7172
region, _ = common.GetRegionFromZones([]string{zone})
7273
testRegionalID = fmt.Sprintf("projects/%s/regions/%s/disks/%s", project, region, name)
@@ -3781,10 +3782,10 @@ func backoffDriver(t *testing.T, config *backoffDriverConfig) *GCEDriver {
37813782

37823783
driver := GetGCEDriver()
37833784
driver.cs = &GCEControllerServer{
3784-
Driver: driver,
3785-
seen: map[string]int{},
3786-
volumeLocks: common.NewVolumeLocks(),
3787-
errorBackoff: newFakeCSIErrorBackoff(config.clock),
3785+
Driver: driver,
3786+
volumeEntriesSeen: map[string]int{},
3787+
volumeLocks: common.NewVolumeLocks(),
3788+
errorBackoff: newFakeCSIErrorBackoff(config.clock),
37883789
}
37893790

37903791
driver.cs.CloudProvider = fcp

pkg/gce-pd-csi-driver/gce-pd-driver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, err
156156
return &GCEControllerServer{
157157
Driver: gceDriver,
158158
CloudProvider: cloudProvider,
159-
seen: map[string]int{},
159+
volumeEntriesSeen: map[string]int{},
160160
volumeLocks: common.NewVolumeLocks(),
161161
errorBackoff: newCsiErrorBackoff(errorBackoffInitialDuration, errorBackoffMaxDuration),
162162
fallbackRequisiteZones: fallbackRequisiteZones,

0 commit comments

Comments
 (0)