Skip to content

satisfy volume cloning topology requirements when choosing zone for CreateVolume #1150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 127 additions & 6 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math/rand"
"regexp"
"sort"
"strings"
"time"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand All @@ -32,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"

"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
Expand Down Expand Up @@ -98,6 +100,14 @@ type workItem struct {
unpublishReq *csi.ControllerUnpublishVolumeRequest
}

// locationRequirements are additional location topology requirements that must be respected when creating a volume.
type locationRequirements struct {
srcVolRegion string
srcVolZone string
srcReplicationType string
cloneReplicationType string
}

var _ csi.ControllerServer = &GCEControllerServer{}

const (
Expand Down Expand Up @@ -142,6 +152,44 @@ func isDiskReady(disk *gce.CloudDisk) (bool, error) {
return false, nil
}

// cloningLocationRequirements returns additional location requirements to be applied to the given create volume requests topology.
// If the CreateVolumeRequest will use volume cloning, location requirements in compliance with the volume cloning limitations
// will be returned: https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/volume-cloning#limitations.
func cloningLocationRequirements(req *csi.CreateVolumeRequest, cloneReplicationType string) (*locationRequirements, error) {
if !useVolumeCloning(req) {
return nil, nil
}
// If we are using volume cloning, this will be set.
volSrc := req.VolumeContentSource.GetVolume()
volSrcVolID := volSrc.GetVolumeId()

_, sourceVolKey, err := common.VolumeIDToKey(volSrcVolID)
if err != nil {
return nil, fmt.Errorf("volume ID is invalid: %w", err)
}

isZonalSrcVol := sourceVolKey.Type() == meta.Zonal
if isZonalSrcVol {
region, err := common.GetRegionFromZones([]string{sourceVolKey.Zone})
if err != nil {
return nil, fmt.Errorf("failed to get region from zones: %w", err)
}
sourceVolKey.Region = region
}

srcReplicationType := replicationTypeNone
if !isZonalSrcVol {
srcReplicationType = replicationTypeRegionalPD
}

return &locationRequirements{srcVolZone: sourceVolKey.Zone, srcVolRegion: sourceVolKey.Region, srcReplicationType: srcReplicationType, cloneReplicationType: cloneReplicationType}, nil
}

// useVolumeCloning returns true if the create volume request should be created with volume cloning.
func useVolumeCloning(req *csi.CreateVolumeRequest) bool {
return req.VolumeContentSource != nil && req.VolumeContentSource.GetVolume() != nil
}

func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
var err error
// Validate arguments
Expand Down Expand Up @@ -177,12 +225,21 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
if multiWriter {
gceAPIVersion = gce.GCEAPIVersionBeta
}

var locationTopReq *locationRequirements
if useVolumeCloning(req) {
locationTopReq, err = cloningLocationRequirements(req, params.ReplicationType)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to get location requirements: %v", err.Error())
}
}

// Determine the zone or zones+region of the disk
var zones []string
var volKey *meta.Key
switch params.ReplicationType {
case replicationTypeNone:
zones, err = pickZones(ctx, gceCS, req.GetAccessibilityRequirements(), 1)
zones, err = pickZones(ctx, gceCS, req.GetAccessibilityRequirements(), 1, locationTopReq)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume failed to pick zones for disk: %v", err.Error())
}
Expand All @@ -192,7 +249,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
volKey = meta.ZonalKey(name, zones[0])

case replicationTypeRegionalPD:
zones, err = pickZones(ctx, gceCS, req.GetAccessibilityRequirements(), 2)
zones, err = pickZones(ctx, gceCS, req.GetAccessibilityRequirements(), 2, locationTopReq)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume failed to pick zones for disk: %v", err.Error())
}
Expand Down Expand Up @@ -1358,7 +1415,29 @@ func diskIsAttachedAndCompatible(deviceName string, instance *compute.Instance,
return false, nil
}

func pickZonesFromTopology(top *csi.TopologyRequirement, numZones int) ([]string, error) {
// pickZonesInRegion will remove any zones that are not in the given region.
func pickZonesInRegion(region string, zones []string) []string {
refinedZones := []string{}
for _, zone := range zones {
if strings.Contains(zone, region) {
refinedZones = append(refinedZones, zone)
}
}
return refinedZones
}

func prependZone(zone string, zones []string) []string {
newZones := []string{zone}
for i := 0; i < len(zones); i++ {
// Do not add a zone if it is equal to the zone that is already prepended to newZones.
if zones[i] != zone {
newZones = append(newZones, zones[i])
}
}
return newZones
}

func pickZonesFromTopology(top *csi.TopologyRequirement, numZones int, locationTopReq *locationRequirements) ([]string, error) {
reqZones, err := getZonesFromTopology(top.GetRequisite())
if err != nil {
return nil, fmt.Errorf("could not get zones from requisite topology: %w", err)
Expand All @@ -1368,6 +1447,39 @@ func pickZonesFromTopology(top *csi.TopologyRequirement, numZones int) ([]string
return nil, fmt.Errorf("could not get zones from preferred topology: %w", err)
}

if locationTopReq != nil {
srcVolZone := locationTopReq.srcVolZone
switch locationTopReq.cloneReplicationType {
// For zonal -> zonal cloning, the source disk zone must match the destination disk zone.
case replicationTypeNone:
// If the source volume zone is not in the topology requirement, we return an error.
if !slices.Contains(prefZones, srcVolZone) && !slices.Contains(reqZones, srcVolZone) {
volumeCloningReq := fmt.Sprintf("clone zone must match source disk zone: %s", srcVolZone)
return nil, fmt.Errorf("failed to find zone from topology %v: %s", top, volumeCloningReq)
}
return []string{srcVolZone}, nil
// For zonal or regional -> regional disk cloning, the source disk region must match the destination disk region.
case replicationTypeRegionalPD:
srcVolRegion := locationTopReq.srcVolRegion
prefZones = pickZonesInRegion(srcVolRegion, prefZones)
reqZones = pickZonesInRegion(srcVolRegion, reqZones)

if len(prefZones) == 0 && len(reqZones) == 0 {
volumeCloningReq := fmt.Sprintf("clone zone must reside in source disk region %s", srcVolRegion)
return nil, fmt.Errorf("failed to find zone from topology %v: %s", top, volumeCloningReq)
}

// For zonal -> regional disk cloning, one of the replicated zones must match the source zone.
if locationTopReq.srcReplicationType == replicationTypeNone {
if !slices.Contains(prefZones, srcVolZone) && !slices.Contains(reqZones, srcVolZone) {
volumeCloningReq := fmt.Sprintf("one of the replica zones of the clone must match the source disk zone: %s", srcVolZone)
return nil, fmt.Errorf("failed to find zone from topology %v: %s", top, volumeCloningReq)
}
prefZones = prependZone(srcVolZone, prefZones)
}
}
}

if numZones <= len(prefZones) {
return prefZones[0:numZones], nil
} else {
Expand Down Expand Up @@ -1426,16 +1538,25 @@ func getZoneFromSegment(seg map[string]string) (string, error) {
return zone, nil
}

func pickZones(ctx context.Context, gceCS *GCEControllerServer, top *csi.TopologyRequirement, numZones int) ([]string, error) {
func pickZones(ctx context.Context, gceCS *GCEControllerServer, top *csi.TopologyRequirement, numZones int, locationTopReq *locationRequirements) ([]string, error) {
var zones []string
var err error
if top != nil {
zones, err = pickZonesFromTopology(top, numZones)
zones, err = pickZonesFromTopology(top, numZones, locationTopReq)
if err != nil {
return nil, fmt.Errorf("failed to pick zones from topology: %w", err)
}
} else {
zones, err = getDefaultZonesInRegion(ctx, gceCS, []string{gceCS.CloudProvider.GetDefaultZone()}, numZones)
existingZones := []string{gceCS.CloudProvider.GetDefaultZone()}
// We set existingZones to the source volume zone so that for zonal -> zonal cloning, the clone is provisioned
// in the same zone as the source volume, and for zonal -> regional, one of the replicated zones will always
// be the zone of the source volume. For regional -> regional cloning, the srcVolZone will not be set, so we
// just use the default zone.
if locationTopReq != nil && locationTopReq.srcReplicationType == replicationTypeNone {
existingZones = []string{locationTopReq.srcVolZone}
}
// If topology is nil, then the Immediate binding mode was used without setting allowedTopologies in the storageclass.
zones, err = getDefaultZonesInRegion(ctx, gceCS, existingZones, numZones)
if err != nil {
return nil, fmt.Errorf("failed to get default %v zones in region: %w", numZones, err)
}
Expand Down
Loading