-
Notifications
You must be signed in to change notification settings - Fork 352
/
Copy pathtopology.go
610 lines (552 loc) · 20.1 KB
/
topology.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"hash/fnv"
"math/rand"
"slices"
"strconv"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/rpc"
"github.com/kubernetes-csi/external-provisioner/v5/pkg/features"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
)
type topologySegment struct {
Key, Value string
}
// topologyTerm represents a single term where its topology key value pairs are AND'd together.
//
// Be sure to sort after construction for compare() and subset() to work properly.
type topologyTerm []topologySegment
func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNodeAffinity {
if len(accessibleTopology) == 0 {
return nil
}
var terms []v1.NodeSelectorTerm
for _, topology := range accessibleTopology {
if len(topology.Segments) == 0 {
continue
}
var expressions []v1.NodeSelectorRequirement
for k, v := range topology.Segments {
expressions = append(expressions, v1.NodeSelectorRequirement{
Key: k,
Operator: v1.NodeSelectorOpIn,
Values: []string{v},
})
}
terms = append(terms, v1.NodeSelectorTerm{
MatchExpressions: expressions,
})
}
return &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: terms,
},
}
}
// VolumeIsAccessible checks whether the generated volume affinity is satisfied by
// a the node topology that a CSI driver reported in GetNodeInfoResponse.
func VolumeIsAccessible(affinity *v1.VolumeNodeAffinity, nodeTopology *csi.Topology) (bool, error) {
if nodeTopology == nil || affinity == nil || affinity.Required == nil {
// No topology information -> all volumes accessible.
return true, nil
}
nodeLabels := labels.Set{}
for k, v := range nodeTopology.Segments {
nodeLabels[k] = v
}
node := v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: nodeLabels,
},
}
return corev1helpers.MatchNodeSelectorTerms(&node, affinity.Required)
}
// SupportsTopology returns whether topology is supported both for plugin and external provisioner
func SupportsTopology(pluginCapabilities rpc.PluginCapabilitySet) bool {
return pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
}
// GenerateAccessibilityRequirements returns the CSI TopologyRequirement
// to pass into the CSI CreateVolume request.
//
// This function is called if the topology feature is enabled
// in the external-provisioner and the CSI driver implements the
// CSI accessibility capability. It is disabled by default.
//
// If enabled, we require that the K8s API server is on at least
// K8s 1.17 and that the K8s Nodes are on at least K8s 1.15 in
// accordance with the 2 version skew between control plane and
// nodes.
//
// There are two main cases to consider:
//
// 1) selectedNode is not set (immediate binding):
//
// In this case, we list all CSINode objects to find a Node that
// the driver has registered topology keys with.
//
// Once we get the list of CSINode objects, we find one that has
// topology keys registered. If none are found, then we assume
// that the driver has not started on any node yet, and we error
// and retry.
//
// If at least one CSINode object is found with topology keys,
// then we continue and use that for assembling the topology
// requirement. The available topologies will be limited to the
// Nodes that the driver has registered with.
//
// 2) selectedNode is set (delayed binding):
//
// We will get the topology from the CSINode object for the selectedNode
// and error if we can't (and retry).
func GenerateAccessibilityRequirements(
kubeClient kubernetes.Interface,
driverName string,
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
selectedNode *v1.Node,
strictTopology bool,
immediateTopology bool,
csiNodeLister storagelistersv1.CSINodeLister,
nodeLister corelisters.NodeLister) (*csi.TopologyRequirement, error) {
requirement := &csi.TopologyRequirement{}
var (
selectedCSINode *storagev1.CSINode
selectedTopology topologyTerm
requisiteTerms []topologyTerm
err error
)
// 1. Get CSINode for the selected node
if selectedNode != nil {
selectedCSINode, err = getSelectedCSINode(csiNodeLister, selectedNode)
if err != nil {
return nil, err
}
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
if len(topologyKeys) == 0 {
// The scheduler selected a node with no topology information.
// This can happen if:
//
// * the node driver is not deployed on all nodes.
// * the node driver is being restarted and has not re-registered yet. This should be
// temporary and a retry should eventually succeed.
//
// Returning an error in provisioning will cause the scheduler to retry and potentially
// (but not guaranteed) pick a different node.
return nil, fmt.Errorf("no topology key found on CSINode %s", selectedCSINode.Name)
}
var isMissingKey bool
selectedTopology, isMissingKey = getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}
if strictTopology {
// Make sure that selected node topology is in allowed topologies list
if len(allowedTopologies) != 0 {
allowedTopologiesFlatten := flatten(allowedTopologies)
found := false
for _, t := range allowedTopologiesFlatten {
if t.subset(selectedTopology) {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("selected node '%q' topology '%v' is not in allowed topologies: %v", selectedNode.Name, selectedTopology, allowedTopologiesFlatten)
}
}
// Only pass topology of selected node.
requisiteTerms = append(requisiteTerms, selectedTopology)
}
}
// 2. Generate CSI Requisite Terms
if len(requisiteTerms) == 0 {
if len(allowedTopologies) != 0 {
// Distribute out one of the OR layers in allowedTopologies
requisiteTerms = flatten(allowedTopologies)
} else {
if selectedNode == nil && !immediateTopology {
// Don't specify any topology requirements because neither the PVC nor
// the storage class have limitations and the CSI driver is not interested
// in being told where it runs (perhaps it already knows, for example).
return nil, nil
}
// Aggregate existing topologies in nodes across the entire cluster.
requisiteTerms, err = aggregateTopologies(driverName, selectedCSINode, csiNodeLister, nodeLister)
if err != nil {
return nil, err
}
if len(requisiteTerms) == 0 {
// We may reach here if the driver has not registered on any nodes.
// We should wait for at least one driver to start so that we can
// provision in a supported topology.
return nil, fmt.Errorf("no available topology found")
}
}
}
// It might be possible to reach here if allowedTopologies had empty entries.
// We fallback to the "topology disabled" behavior.
if len(requisiteTerms) == 0 {
return nil, nil
}
slices.SortFunc(requisiteTerms, topologyTerm.compare)
requisiteTerms = slices.CompactFunc(requisiteTerms, slices.Equal)
// TODO (verult) reduce subset duplicate terms (advanced reduction)
requirement.Requisite = toCSITopology(requisiteTerms)
// 3. Generate CSI Preferred Terms
var preferredTerms []topologyTerm
if selectedCSINode == nil {
// Immediate binding, we fallback to statefulset spreading hash for backwards compatibility.
// Ensure even spreading of StatefulSet volumes by sorting
// requisiteTerms and shifting the sorted terms based on hash of pvcName and replica index suffix
hash, index := getPVCNameHashAndIndexOffset(pvcName)
i := (hash + index) % uint32(len(requisiteTerms))
preferredTerms = append(requisiteTerms[i:], requisiteTerms[:i]...)
} else {
// Delayed binding, use topology from that node to populate preferredTerms
if strictTopology {
// In case of strict topology, preferred = requisite
preferredTerms = requisiteTerms
} else {
for i, t := range requisiteTerms {
if t.subset(selectedTopology) {
preferredTerms = append(requisiteTerms[i:], requisiteTerms[:i]...)
break
}
}
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
// constraint.
// - Otherwise, the aggregated topology is guaranteed to contain topology information from the
// selected node.
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite: %v", selectedTopology, selectedNode.Name, requisiteTerms)
}
}
}
requirement.Preferred = toCSITopology(preferredTerms)
return requirement, nil
}
// getSelectedCSINode returns the CSINode object for the given selectedNode.
func getSelectedCSINode(
csiNodeLister storagelistersv1.CSINodeLister,
selectedNode *v1.Node) (*storagev1.CSINode, error) {
selectedCSINode, err := csiNodeLister.Get(selectedNode.Name)
if err != nil {
// We don't want to fallback and provision in the wrong topology if there's some temporary
// error with the API server.
return nil, fmt.Errorf("error getting CSINode for selected node %q: %v", selectedNode.Name, err)
}
if selectedCSINode == nil {
return nil, fmt.Errorf("CSINode for selected node %q not found", selectedNode.Name)
}
return selectedCSINode, nil
}
// aggregateTopologies returns all the supported topology values in the cluster that
// match the driver's topology keys.
func aggregateTopologies(
driverName string,
selectedCSINode *storagev1.CSINode,
csiNodeLister storagelistersv1.CSINodeLister,
nodeLister corelisters.NodeLister) ([]topologyTerm, error) {
// 1. Determine topologyKeys to use for aggregation
var topologyKeys []string
if selectedCSINode == nil {
// Immediate binding
csiNodes, err := csiNodeLister.List(labels.Everything())
if err != nil {
// Require CSINode beta feature on K8s apiserver to be enabled.
// We don't want to fallback and provision in the wrong topology if there's some temporary
// error with the API server.
return nil, fmt.Errorf("error listing CSINodes: %v", err)
}
rand.Shuffle(len(csiNodes), func(i, j int) {
csiNodes[i], csiNodes[j] = csiNodes[j], csiNodes[i]
})
// Pick the first node with topology keys
for _, csiNode := range csiNodes {
topologyKeys = getTopologyKeys(csiNode, driverName)
if topologyKeys != nil {
break
}
}
if len(topologyKeys) == 0 {
// The driver supports topology but no nodes have registered any topology keys.
// This is possible if nodes have not been upgraded to use the beta CSINode feature.
klog.Warningf("No topology keys found on any node")
return nil, nil
}
} else {
// Delayed binding; use topology key from selected node
topologyKeys = getTopologyKeys(selectedCSINode, driverName)
if len(topologyKeys) == 0 {
// The scheduler selected a node with no topology information.
// This can happen if:
//
// * the node driver is not deployed on all nodes.
// * the node driver is being restarted and has not re-registered yet. This should be
// temporary and a retry should eventually succeed.
//
// Returning an error in provisioning will cause the scheduler to retry and potentially
// (but not guaranteed) pick a different node.
return nil, fmt.Errorf("no topology key found on CSINode %s", selectedCSINode.Name)
}
// Even though selectedNode is set, we still need to aggregate topology values across
// all nodes in order to find additional topologies for the volume types that can span
// multiple topology values.
//
// TODO (#221): allow drivers to limit the number of topology values that are returned
// If the driver specifies 1, then we can optimize here to only return the selected node's
// topology instead of aggregating across all Nodes.
}
// 2. Find all nodes with the topology keys and extract the topology values
selector, err := buildTopologyKeySelector(topologyKeys)
if err != nil {
return nil, err
}
nodes, err := nodeLister.List(selector)
if err != nil {
return nil, fmt.Errorf("error listing nodes: %v", err)
}
var terms []topologyTerm
for _, node := range nodes {
term, _ := getTopologyFromNode(node, topologyKeys)
terms = append(terms, term)
}
if len(terms) == 0 {
// This means that a CSINode was found with topologyKeys, but we couldn't find
// the topology labels on any nodes.
return nil, fmt.Errorf("topologyKeys %v were not found on any nodes", topologyKeys)
}
return terms, nil
}
// AllowedTopologies is an OR of TopologySelectorTerms.
// A TopologySelectorTerm contains an AND of TopologySelectorLabelRequirements.
// A TopologySelectorLabelRequirement contains a single key and an OR of topology values.
//
// The Requisite field contains an OR of Segments.
// A segment contains an AND of topology key value pairs.
//
// In order to convert AllowedTopologies to CSI Requisite, one of its OR layers must be eliminated.
// This function eliminates the OR of topology values by distributing the OR over the AND a level
// higher.
// For example, given a TopologySelectorTerm of this form:
//
// {
// "zone": { "zone1", "zone2" },
// "rack": { "rackA", "rackB" },
// }
//
// Abstractly it could be viewed as:
//
// (zone1 OR zone2) AND (rackA OR rackB)
//
// Distributing the OR over the AND, we get:
//
// (zone1 AND rackA) OR (zone2 AND rackA) OR (zone1 AND rackB) OR (zone2 AND rackB)
//
// which in the intermediate representation returned by this function becomes:
//
// [
// { "zone": "zone1", "rack": "rackA" },
// { "zone": "zone2", "rack": "rackA" },
// { "zone": "zone1", "rack": "rackB" },
// { "zone": "zone2", "rack": "rackB" },
// ]
//
// This flattening is then applied to all TopologySelectorTerms in AllowedTopologies, and
// the resulting terms are OR'd together.
func flatten(allowedTopologies []v1.TopologySelectorTerm) []topologyTerm {
var finalTerms []topologyTerm
for _, selectorTerm := range allowedTopologies { // OR
var oldTerms []topologyTerm
for _, selectorExpression := range selectorTerm.MatchLabelExpressions { // AND
var newTerms []topologyTerm
for _, v := range selectorExpression.Values { // OR
// Distribute the OR over AND.
if len(oldTerms) == 0 {
// No previous terms to distribute over. Simply append the new term.
newTerms = append(newTerms, topologyTerm{{selectorExpression.Key, v}})
} else {
for _, oldTerm := range oldTerms {
// "Distribute" by adding an entry to the term
newTerm := slices.Clone(oldTerm)
newTerm = append(newTerm, topologySegment{selectorExpression.Key, v})
newTerms = append(newTerms, newTerm)
}
}
}
oldTerms = newTerms
}
// Concatenate all OR'd terms.
finalTerms = append(finalTerms, oldTerms...)
}
for _, term := range finalTerms {
term.sort()
}
return finalTerms
}
func getTopologyKeys(csiNode *storagev1.CSINode, driverName string) []string {
for _, driver := range csiNode.Spec.Drivers {
if driver.Name == driverName {
return driver.TopologyKeys
}
}
return nil
}
func getTopologyFromNode(node *v1.Node, topologyKeys []string) (term topologyTerm, isMissingKey bool) {
term = make(topologyTerm, 0, len(topologyKeys))
for _, key := range topologyKeys {
v, ok := node.Labels[key]
if !ok {
return nil, true
}
term = append(term, topologySegment{key, v})
}
term.sort()
return term, false
}
func buildTopologyKeySelector(topologyKeys []string) (labels.Selector, error) {
var expr []metav1.LabelSelectorRequirement
for _, key := range topologyKeys {
expr = append(expr, metav1.LabelSelectorRequirement{
Key: key,
Operator: metav1.LabelSelectorOpExists,
})
}
labelSelector := metav1.LabelSelector{
MatchExpressions: expr,
}
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return nil, fmt.Errorf("error parsing topology keys selector: %v", err)
}
return selector, nil
}
func (t topologyTerm) sort() {
slices.SortFunc(t, func(a, b topologySegment) int {
r := strings.Compare(a.Key, b.Key)
if r != 0 {
return r
}
// Should not happen currently. We may support multi-value in the future?
return strings.Compare(a.Value, b.Value)
})
}
func (t topologyTerm) compare(other topologyTerm) int {
if len(t) != len(other) {
return len(t) - len(other)
}
for i, k1 := range t {
k2 := other[i]
r := strings.Compare(k1.Key, k2.Key)
if r != 0 {
return r
}
r = strings.Compare(k1.Value, k2.Value)
if r != 0 {
return r
}
}
return 0
}
func (t topologyTerm) subset(other topologyTerm) bool {
if len(t) == 0 {
return true
}
j := 0
for _, k2 := range other {
k1 := t[j]
if k1.Key != k2.Key {
continue
}
if k1.Value != k2.Value {
return false
}
j++
if j == len(t) {
// All segments in t have been checked and is present in other.
return true
}
}
return false
}
func toCSITopology(terms []topologyTerm) []*csi.Topology {
out := make([]*csi.Topology, 0, len(terms))
for _, term := range terms {
segs := make(map[string]string, len(term))
for _, k := range term {
segs[k.Key] = k.Value
}
out = append(out, &csi.Topology{Segments: segs})
}
return out
}
// identical to logic in getPVCNameHashAndIndexOffset in pkg/volume/util/util.go in-tree
// [https://github.com/kubernetes/kubernetes/blob/master/pkg/volume/util/util.go]
func getPVCNameHashAndIndexOffset(pvcName string) (hash uint32, index uint32) {
if pvcName == "" {
// We should always be called with a name; this shouldn't happen
hash = rand.Uint32()
} else {
hashString := pvcName
// Heuristic to make sure that volumes in a StatefulSet are spread across zones
// StatefulSet PVCs are (currently) named ClaimName-StatefulSetName-Id,
// where Id is an integer index.
// Note though that if a StatefulSet pod has multiple claims, we need them to be
// in the same zone, because otherwise the pod will be unable to mount both volumes,
// and will be unschedulable. So we hash _only_ the "StatefulSetName" portion when
// it looks like `ClaimName-StatefulSetName-Id`.
// We continue to round-robin volume names that look like `Name-Id` also; this is a useful
// feature for users that are creating statefulset-like functionality without using statefulsets.
lastDash := strings.LastIndexByte(pvcName, '-')
if lastDash != -1 {
statefulsetIDString := pvcName[lastDash+1:]
statefulsetID, err := strconv.ParseUint(statefulsetIDString, 10, 32)
if err == nil {
// Offset by the statefulsetID, so we round-robin across zones
index = uint32(statefulsetID)
// We still hash the volume name, but only the prefix
hashString = pvcName[:lastDash]
// In the special case where it looks like `ClaimName-StatefulSetName-Id`,
// hash only the StatefulSetName, so that different claims on the same StatefulSet
// member end up in the same zone.
// Note that StatefulSetName (and ClaimName) might themselves both have dashes.
// We actually just take the portion after the final - of ClaimName-StatefulSetName.
// For our purposes it doesn't much matter (just suboptimal spreading).
lastDash := strings.LastIndexByte(hashString, '-')
if lastDash != -1 {
hashString = hashString[lastDash+1:]
}
}
}
// We hash the (base) volume name, so we don't bias towards the first N zones
h := fnv.New32()
h.Write([]byte(hashString))
hash = h.Sum32()
}
return hash, index
}