Skip to content

Commit 3a8728f

Browse files
authored
✨ kcp: add preflight check for pending version upgrade from topology (#11927)
* kcp: add preflight check for pending version upgrade from topology * topology: propagate changes to CP even when scaling to prevent deadlocks * kcp: adjust predicates to reconcile the event of cluster version changes * review fixes * fixup * review fix * drop IsScaling for kcp in upgradetracker
1 parent 7db012e commit 3a8728f

File tree

15 files changed

+221
-282
lines changed

15 files changed

+221
-282
lines changed

controlplane/kubeadm/internal/control_plane.go

+2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ type PreflightCheckResults struct {
9191
ControlPlaneComponentsNotHealthy bool
9292
// EtcdClusterNotHealthy reports true if preflight check detected that the etcd cluster is not fully healthy.
9393
EtcdClusterNotHealthy bool
94+
// TopologyVersionMismatch reports true if preflight check detected that the Cluster's topology version does not match the control plane's version
95+
TopologyVersionMismatch bool
9496
}
9597

9698
// NewControlPlane returns an instantiated ControlPlane.

controlplane/kubeadm/internal/controllers/controller.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,10 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
122122
predicates.All(mgr.GetScheme(), predicateLog,
123123
predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog),
124124
predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue),
125-
predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog),
125+
predicates.Any(mgr.GetScheme(), predicateLog,
126+
predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog),
127+
predicates.ClusterTopologyVersionChanged(mgr.GetScheme(), predicateLog),
128+
),
126129
),
127130
),
128131
).

controlplane/kubeadm/internal/controllers/remediation.go

+23
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
3535
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
3636
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
37+
"sigs.k8s.io/cluster-api/feature"
3738
"sigs.k8s.io/cluster-api/util/annotations"
3839
"sigs.k8s.io/cluster-api/util/collections"
3940
"sigs.k8s.io/cluster-api/util/conditions"
@@ -168,6 +169,28 @@ func (r *KubeadmControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
168169
// Before starting remediation, run preflight checks in order to verify it is safe to remediate.
169170
// If any of the following checks fails, we'll surface the reason in the MachineOwnerRemediated condition.
170171

172+
if feature.Gates.Enabled(feature.ClusterTopology) {
173+
// Skip remediation when we expect an upgrade to be propagated from the cluster topology.
174+
if controlPlane.Cluster.Spec.Topology != nil && controlPlane.Cluster.Spec.Topology.Version != controlPlane.KCP.Spec.Version {
175+
message := fmt.Sprintf("KubeadmControlPlane can't remediate while waiting for a version upgrade to %s to be propagated from Cluster.spec.topology", controlPlane.Cluster.Spec.Topology.Version)
176+
log.Info(fmt.Sprintf("A control plane machine needs remediation, but %s. Skipping remediation", message))
177+
conditions.MarkFalse(machineToBeRemediated,
178+
clusterv1.MachineOwnerRemediatedCondition,
179+
clusterv1.WaitingForRemediationReason,
180+
clusterv1.ConditionSeverityWarning,
181+
message)
182+
183+
v1beta2conditions.Set(machineToBeRemediated, metav1.Condition{
184+
Type: clusterv1.MachineOwnerRemediatedV1Beta2Condition,
185+
Status: metav1.ConditionFalse,
186+
Reason: controlplanev1.KubeadmControlPlaneMachineRemediationDeferredV1Beta2Reason,
187+
Message: message,
188+
})
189+
190+
return ctrl.Result{}, nil
191+
}
192+
}
193+
171194
// Check if KCP is allowed to remediate considering retry limits:
172195
// - Remediation cannot happen because retryPeriod is not yet expired.
173196
// - KCP already reached MaxRetries limit.

controlplane/kubeadm/internal/controllers/remediation_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ import (
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/util/intstr"
3131
"k8s.io/client-go/tools/record"
32+
utilfeature "k8s.io/component-base/featuregate/testing"
3233
utilptr "k8s.io/utils/ptr"
3334
"sigs.k8s.io/controller-runtime/pkg/client"
3435

3536
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
3637
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
3738
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
39+
"sigs.k8s.io/cluster-api/feature"
3840
"sigs.k8s.io/cluster-api/util/collections"
3941
"sigs.k8s.io/cluster-api/util/conditions"
4042
v1beta2conditions "sigs.k8s.io/cluster-api/util/conditions/v1beta2"
@@ -274,6 +276,59 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
274276
g.Expect(ret.IsZero()).To(BeTrue()) // Remediation skipped
275277
g.Expect(err).ToNot(HaveOccurred())
276278
})
279+
t.Run("reconcileUnhealthyMachines return early if there is a pending topology upgrade", func(t *testing.T) {
280+
utilfeature.SetFeatureGateDuringTest(t, feature.Gates, feature.ClusterTopology, true)
281+
282+
g := NewWithT(t)
283+
284+
m1 := createMachine(ctx, g, ns.Name, "m1-unhealthy-", withMachineHealthCheckFailed(), withWaitBeforeDeleteFinalizer())
285+
m2 := createMachine(ctx, g, ns.Name, "m2-healthy-", withHealthyEtcdMember())
286+
m3 := createMachine(ctx, g, ns.Name, "m3-healthy-", withHealthyEtcdMember())
287+
288+
controlPlane := &internal.ControlPlane{
289+
KCP: &controlplanev1.KubeadmControlPlane{
290+
Spec: controlplanev1.KubeadmControlPlaneSpec{
291+
Replicas: utilptr.To[int32](3),
292+
Version: "v1.19.1",
293+
},
294+
Status: controlplanev1.KubeadmControlPlaneStatus{
295+
Initialized: true,
296+
},
297+
},
298+
Cluster: &clusterv1.Cluster{
299+
Spec: clusterv1.ClusterSpec{
300+
Topology: &clusterv1.Topology{
301+
Version: "v1.20.1",
302+
},
303+
},
304+
},
305+
Machines: collections.FromMachines(m1, m2, m3),
306+
}
307+
308+
r := &KubeadmControlPlaneReconciler{
309+
Client: env.GetClient(),
310+
recorder: record.NewFakeRecorder(32),
311+
managementCluster: &fakeManagementCluster{
312+
Workload: &fakeWorkloadCluster{
313+
EtcdMembersResult: nodes(controlPlane.Machines),
314+
},
315+
},
316+
}
317+
controlPlane.InjectTestManagementCluster(r.managementCluster)
318+
ret, err := r.reconcileUnhealthyMachines(ctx, controlPlane)
319+
320+
g.Expect(ret.IsZero()).To(BeTrue()) // Remediation skipped
321+
g.Expect(err).ToNot(HaveOccurred())
322+
323+
assertMachineCondition(ctx, g, m1, clusterv1.MachineOwnerRemediatedCondition, corev1.ConditionFalse, clusterv1.WaitingForRemediationReason, clusterv1.ConditionSeverityWarning, "KubeadmControlPlane can't remediate while waiting for a version upgrade to v1.20.1 to be propagated from Cluster.spec.topology")
324+
assertMachineV1beta2Condition(ctx, g, m1, clusterv1.MachineOwnerRemediatedV1Beta2Condition, metav1.ConditionFalse, controlplanev1.KubeadmControlPlaneMachineRemediationDeferredV1Beta2Reason, "KubeadmControlPlane can't remediate while waiting for a version upgrade to v1.20.1 to be propagated from Cluster.spec.topology")
325+
326+
err = env.Get(ctx, client.ObjectKey{Namespace: m1.Namespace, Name: m1.Name}, m1)
327+
g.Expect(err).ToNot(HaveOccurred())
328+
g.Expect(m1.ObjectMeta.DeletionTimestamp.IsZero()).To(BeTrue())
329+
330+
g.Expect(env.Cleanup(ctx, m1, m2, m3)).To(Succeed())
331+
})
277332
t.Run("Remediation does not happen if MaxRetry is reached", func(t *testing.T) {
278333
g := NewWithT(t)
279334

controlplane/kubeadm/internal/controllers/scale.go

+11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controllers
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"strings"
2223

2324
"github.com/pkg/errors"
@@ -30,6 +31,7 @@ import (
3031
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
3132
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
3233
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
34+
"sigs.k8s.io/cluster-api/feature"
3335
"sigs.k8s.io/cluster-api/util/collections"
3436
"sigs.k8s.io/cluster-api/util/conditions"
3537
"sigs.k8s.io/cluster-api/util/version"
@@ -184,6 +186,15 @@ func (r *KubeadmControlPlaneReconciler) preflightChecks(ctx context.Context, con
184186
return ctrl.Result{}, nil
185187
}
186188

189+
if feature.Gates.Enabled(feature.ClusterTopology) {
190+
// Block when we expect an upgrade to be propagated for topology clusters.
191+
if controlPlane.Cluster.Spec.Topology != nil && controlPlane.Cluster.Spec.Topology.Version != controlPlane.KCP.Spec.Version {
192+
logger.Info(fmt.Sprintf("Waiting for a version upgrade to %s to be propagated from Cluster.spec.topology", controlPlane.Cluster.Spec.Topology.Version))
193+
controlPlane.PreflightCheckResults.TopologyVersionMismatch = true
194+
return ctrl.Result{RequeueAfter: preflightFailedRequeueAfter}, nil
195+
}
196+
}
197+
187198
// If there are deleting machines, wait for the operation to complete.
188199
if controlPlane.HasDeletingMachine() {
189200
controlPlane.PreflightCheckResults.HasDeletingMachine = true

controlplane/kubeadm/internal/controllers/scale_test.go

+41-1
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import (
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/types"
2929
"k8s.io/client-go/tools/record"
30+
utilfeature "k8s.io/component-base/featuregate/testing"
3031
ctrl "sigs.k8s.io/controller-runtime"
3132
"sigs.k8s.io/controller-runtime/pkg/client"
3233

3334
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
3435
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
3536
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
3637
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
38+
"sigs.k8s.io/cluster-api/feature"
3739
"sigs.k8s.io/cluster-api/util"
3840
"sigs.k8s.io/cluster-api/util/collections"
3941
"sigs.k8s.io/cluster-api/util/conditions"
@@ -516,8 +518,10 @@ func TestSelectMachineForScaleDown(t *testing.T) {
516518
}
517519

518520
func TestPreflightChecks(t *testing.T) {
521+
utilfeature.SetFeatureGateDuringTest(t, feature.Gates, feature.ClusterTopology, true)
519522
testCases := []struct {
520523
name string
524+
cluster *clusterv1.Cluster
521525
kcp *controlplanev1.KubeadmControlPlane
522526
machines []*clusterv1.Machine
523527
expectResult ctrl.Result
@@ -531,6 +535,33 @@ func TestPreflightChecks(t *testing.T) {
531535
HasDeletingMachine: false,
532536
ControlPlaneComponentsNotHealthy: false,
533537
EtcdClusterNotHealthy: false,
538+
TopologyVersionMismatch: false,
539+
},
540+
},
541+
{
542+
name: "control plane with a pending upgrade should requeue",
543+
cluster: &clusterv1.Cluster{
544+
Spec: clusterv1.ClusterSpec{
545+
Topology: &clusterv1.Topology{
546+
Version: "v1.33.0",
547+
},
548+
},
549+
},
550+
kcp: &controlplanev1.KubeadmControlPlane{
551+
Spec: controlplanev1.KubeadmControlPlaneSpec{
552+
Version: "v1.32.0",
553+
},
554+
},
555+
machines: []*clusterv1.Machine{
556+
{},
557+
},
558+
559+
expectResult: ctrl.Result{RequeueAfter: preflightFailedRequeueAfter},
560+
expectPreflight: internal.PreflightCheckResults{
561+
HasDeletingMachine: false,
562+
ControlPlaneComponentsNotHealthy: false,
563+
EtcdClusterNotHealthy: false,
564+
TopologyVersionMismatch: true,
534565
},
535566
},
536567
{
@@ -548,6 +579,7 @@ func TestPreflightChecks(t *testing.T) {
548579
HasDeletingMachine: true,
549580
ControlPlaneComponentsNotHealthy: false,
550581
EtcdClusterNotHealthy: false,
582+
TopologyVersionMismatch: false,
551583
},
552584
},
553585
{
@@ -566,6 +598,7 @@ func TestPreflightChecks(t *testing.T) {
566598
HasDeletingMachine: false,
567599
ControlPlaneComponentsNotHealthy: true,
568600
EtcdClusterNotHealthy: true,
601+
TopologyVersionMismatch: false,
569602
},
570603
},
571604
{
@@ -593,6 +626,7 @@ func TestPreflightChecks(t *testing.T) {
593626
HasDeletingMachine: false,
594627
ControlPlaneComponentsNotHealthy: true,
595628
EtcdClusterNotHealthy: false,
629+
TopologyVersionMismatch: false,
596630
},
597631
},
598632
{
@@ -620,6 +654,7 @@ func TestPreflightChecks(t *testing.T) {
620654
HasDeletingMachine: false,
621655
ControlPlaneComponentsNotHealthy: false,
622656
EtcdClusterNotHealthy: true,
657+
TopologyVersionMismatch: false,
623658
},
624659
},
625660
{
@@ -654,6 +689,7 @@ func TestPreflightChecks(t *testing.T) {
654689
HasDeletingMachine: false,
655690
ControlPlaneComponentsNotHealthy: false,
656691
EtcdClusterNotHealthy: false,
692+
TopologyVersionMismatch: false,
657693
},
658694
},
659695
}
@@ -665,8 +701,12 @@ func TestPreflightChecks(t *testing.T) {
665701
r := &KubeadmControlPlaneReconciler{
666702
recorder: record.NewFakeRecorder(32),
667703
}
704+
cluster := &clusterv1.Cluster{}
705+
if tt.cluster != nil {
706+
cluster = tt.cluster
707+
}
668708
controlPlane := &internal.ControlPlane{
669-
Cluster: &clusterv1.Cluster{},
709+
Cluster: cluster,
670710
KCP: tt.kcp,
671711
Machines: collections.FromMachines(tt.machines...),
672712
}

controlplane/kubeadm/internal/controllers/status.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ func (r *KubeadmControlPlaneReconciler) updateV1Beta2Status(ctx context.Context,
162162
setReplicas(ctx, controlPlane.KCP, controlPlane.Machines)
163163
setInitializedCondition(ctx, controlPlane.KCP)
164164
setRollingOutCondition(ctx, controlPlane.KCP, controlPlane.Machines)
165-
setScalingUpCondition(ctx, controlPlane.KCP, controlPlane.Machines, controlPlane.InfraMachineTemplateIsNotFound, controlPlane.PreflightCheckResults)
166-
setScalingDownCondition(ctx, controlPlane.KCP, controlPlane.Machines, controlPlane.PreflightCheckResults)
165+
setScalingUpCondition(ctx, controlPlane.Cluster, controlPlane.KCP, controlPlane.Machines, controlPlane.InfraMachineTemplateIsNotFound, controlPlane.PreflightCheckResults)
166+
setScalingDownCondition(ctx, controlPlane.Cluster, controlPlane.KCP, controlPlane.Machines, controlPlane.PreflightCheckResults)
167167
setMachinesReadyCondition(ctx, controlPlane.KCP, controlPlane.Machines)
168168
setMachinesUpToDateCondition(ctx, controlPlane.KCP, controlPlane.Machines)
169169
setRemediatingCondition(ctx, controlPlane.KCP, controlPlane.MachinesToBeRemediatedByKCP(), controlPlane.UnhealthyMachines())
@@ -265,7 +265,7 @@ func setRollingOutCondition(_ context.Context, kcp *controlplanev1.KubeadmContro
265265
})
266266
}
267267

268-
func setScalingUpCondition(_ context.Context, kcp *controlplanev1.KubeadmControlPlane, machines collections.Machines, infrastructureObjectNotFound bool, preflightChecks internal.PreflightCheckResults) {
268+
func setScalingUpCondition(_ context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, machines collections.Machines, infrastructureObjectNotFound bool, preflightChecks internal.PreflightCheckResults) {
269269
if kcp.Spec.Replicas == nil {
270270
v1beta2conditions.Set(kcp, metav1.Condition{
271271
Type: controlplanev1.KubeadmControlPlaneScalingUpV1Beta2Condition,
@@ -300,7 +300,7 @@ func setScalingUpCondition(_ context.Context, kcp *controlplanev1.KubeadmControl
300300

301301
message := fmt.Sprintf("Scaling up from %d to %d replicas", currentReplicas, desiredReplicas)
302302

303-
additionalMessages := getPreflightMessages(preflightChecks)
303+
additionalMessages := getPreflightMessages(cluster, preflightChecks)
304304
if missingReferencesMessage != "" {
305305
additionalMessages = append(additionalMessages, fmt.Sprintf("* %s", missingReferencesMessage))
306306
}
@@ -317,7 +317,7 @@ func setScalingUpCondition(_ context.Context, kcp *controlplanev1.KubeadmControl
317317
})
318318
}
319319

320-
func setScalingDownCondition(_ context.Context, kcp *controlplanev1.KubeadmControlPlane, machines collections.Machines, preflightChecks internal.PreflightCheckResults) {
320+
func setScalingDownCondition(_ context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, machines collections.Machines, preflightChecks internal.PreflightCheckResults) {
321321
if kcp.Spec.Replicas == nil {
322322
v1beta2conditions.Set(kcp, metav1.Condition{
323323
Type: controlplanev1.KubeadmControlPlaneScalingDownV1Beta2Condition,
@@ -345,7 +345,7 @@ func setScalingDownCondition(_ context.Context, kcp *controlplanev1.KubeadmContr
345345

346346
message := fmt.Sprintf("Scaling down from %d to %d replicas", currentReplicas, desiredReplicas)
347347

348-
additionalMessages := getPreflightMessages(preflightChecks)
348+
additionalMessages := getPreflightMessages(cluster, preflightChecks)
349349
if staleMessage := aggregateStaleMachines(machines); staleMessage != "" {
350350
additionalMessages = append(additionalMessages, fmt.Sprintf("* %s", staleMessage))
351351
}
@@ -804,8 +804,12 @@ func minTime(t1, t2 time.Time) time.Time {
804804
return t1
805805
}
806806

807-
func getPreflightMessages(preflightChecks internal.PreflightCheckResults) []string {
807+
func getPreflightMessages(cluster *clusterv1.Cluster, preflightChecks internal.PreflightCheckResults) []string {
808808
additionalMessages := []string{}
809+
if preflightChecks.TopologyVersionMismatch {
810+
additionalMessages = append(additionalMessages, fmt.Sprintf("* waiting for a version upgrade to %s to be propagated from Cluster.spec.topology", cluster.Spec.Topology.Version))
811+
}
812+
809813
if preflightChecks.HasDeletingMachine {
810814
additionalMessages = append(additionalMessages, "* waiting for a control plane Machine to complete deletion")
811815
}

0 commit comments

Comments
 (0)