Skip to content

Add --fallback-requisite-zones command line argument #1532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math/rand"
"os"
"runtime"
"strings"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -66,6 +67,8 @@ var (
maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations")
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")

fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")

version string
)

Expand Down Expand Up @@ -128,6 +131,9 @@ func handle() {
// Initialize identity server
identityServer := driver.NewIdentityServer(gceDriver)

// Initilaize requisite zones
fallbackRequisiteZones := strings.Split(*fallbackRequisiteZonesFlag, ",")

// Initialize requirements for the controller service
var controllerServer *driver.GCEControllerServer
if *runControllerService {
Expand All @@ -137,7 +143,7 @@ func handle() {
}
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration)
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones)
} else if *cloudConfigFilePath != "" {
klog.Warningf("controller service is disabled but cloud config given - it has no effect")
}
Expand Down
67 changes: 44 additions & 23 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ type GCEControllerServer struct {
// publish/unpublish call will clear the backoff condition for a node and
// disk.
errorBackoff *csiErrorBackoff

// Requisite zones to fallback to when provisioning a disk.
// If there are an insufficient number of zones available in the union
// of preferred/requisite topology, this list is used instead of
// the passed in requisite topology.
// The main use case of this field is to support Regional Persistent Disk
// provisioning in GKE Autopilot, where a GKE cluster to
// be scaled down to 1 zone.
fallbackRequisiteZones []string
}

type csiErrorBackoffId string
Expand Down Expand Up @@ -272,7 +281,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
var volKey *meta.Key
switch params.ReplicationType {
case replicationTypeNone:
zones, err = pickZones(ctx, gceCS, req.GetAccessibilityRequirements(), 1, locationTopReq)
zones, err = gceCS.pickZones(ctx, req.GetAccessibilityRequirements(), 1, locationTopReq)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume failed to pick zones for disk: %v", err.Error())
}
Expand All @@ -282,7 +291,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
volKey = meta.ZonalKey(name, zones[0])

case replicationTypeRegionalPD:
zones, err = pickZones(ctx, gceCS, req.GetAccessibilityRequirements(), 2, locationTopReq)
zones, err = gceCS.pickZones(ctx, req.GetAccessibilityRequirements(), 2, locationTopReq)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume failed to pick zones for disk: %v", err.Error())
}
Expand Down Expand Up @@ -1551,7 +1560,7 @@ func prependZone(zone string, zones []string) []string {
return newZones
}

func pickZonesFromTopology(top *csi.TopologyRequirement, numZones int, locationTopReq *locationRequirements) ([]string, error) {
func pickZonesFromTopology(top *csi.TopologyRequirement, numZones int, locationTopReq *locationRequirements, fallbackRequisiteZones []string) ([]string, error) {
reqZones, err := getZonesFromTopology(top.GetRequisite())
if err != nil {
return nil, fmt.Errorf("could not get zones from requisite topology: %w", err)
Expand Down Expand Up @@ -1596,27 +1605,39 @@ func pickZonesFromTopology(top *csi.TopologyRequirement, numZones int, locationT

if numZones <= len(prefZones) {
return prefZones[0:numZones], nil
} else {
zones := sets.String{}
// Add all preferred zones into zones
zones.Insert(prefZones...)
remainingNumZones := numZones - len(prefZones)
// Take all of the remaining zones from requisite zones
reqSet := sets.NewString(reqZones...)
prefSet := sets.NewString(prefZones...)
remainingZones := reqSet.Difference(prefSet)

if remainingZones.Len() < remainingNumZones {
}

remainingNumZones := numZones - len(prefZones)
// Take all of the remaining zones from requisite zones
reqSet := sets.NewString(reqZones...)
prefSet := sets.NewString(prefZones...)
remainingZones := reqSet.Difference(prefSet)

if remainingZones.Len() < remainingNumZones {
fallbackSet := sets.NewString(fallbackRequisiteZones...)
remainingFallbackZones := fallbackSet.Difference(prefSet)
if remainingFallbackZones.Len() >= remainingNumZones {
remainingZones = remainingFallbackZones
} else {
return nil, fmt.Errorf("need %v zones from topology, only got %v unique zones", numZones, reqSet.Union(prefSet).Len())
}
// Add the remaining number of zones into the set
nSlice, err := pickRandAndConsecutive(remainingZones.List(), remainingNumZones)
if err != nil {
return nil, err
}
zones.Insert(nSlice...)
return zones.List(), nil
}

allZones := prefSet.Union(remainingZones).List()
sort.Strings(allZones)
var shiftIndex int
if len(prefZones) == 0 {
// Random shift the requisite zones, since there is no preferred start.
shiftIndex = rand.Intn(len(allZones))
} else {
shiftIndex = slices.Index(allZones, prefZones[0])
}
shiftedZones := append(allZones[shiftIndex:], allZones[:shiftIndex]...)
sortedShiftedReqZones := slices.Filter(nil, shiftedZones, func(v string) bool { return !prefSet.Has(v) })
zones := make([]string, 0, numZones)
zones = append(zones, prefZones...)
zones = append(zones, sortedShiftedReqZones...)
return zones[:numZones], nil
}

func getZonesFromTopology(topList []*csi.Topology) ([]string, error) {
Expand Down Expand Up @@ -1652,11 +1673,11 @@ func getZoneFromSegment(seg map[string]string) (string, error) {
return zone, nil
}

func pickZones(ctx context.Context, gceCS *GCEControllerServer, top *csi.TopologyRequirement, numZones int, locationTopReq *locationRequirements) ([]string, error) {
func (gceCS *GCEControllerServer) pickZones(ctx context.Context, top *csi.TopologyRequirement, numZones int, locationTopReq *locationRequirements) ([]string, error) {
var zones []string
var err error
if top != nil {
zones, err = pickZonesFromTopology(top, numZones, locationTopReq)
zones, err = pickZonesFromTopology(top, numZones, locationTopReq, gceCS.fallbackRequisiteZones)
if err != nil {
return nil, fmt.Errorf("failed to pick zones from topology: %w", err)
}
Expand Down
Loading