Skip to content

Commit 2099375

Browse files
Jefftreek8s-publishing-bot
authored andcommitted
CLE controller and client changes
Kubernetes-commit: c47ff1e1a9aec44f262674eb6cdbabf80512d981
1 parent 8a2bbd0 commit 2099375

File tree

6 files changed

+596
-13
lines changed

6 files changed

+596
-13
lines changed

tools/leaderelection/leaderelection.go

+88-3
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ type LeaderElectionConfig struct {
159159

160160
// Name is the name of the resource lock for debugging
161161
Name string
162+
163+
// Coordinated will use the Coordinated Leader Election feature
164+
Coordinated bool
162165
}
163166

164167
// LeaderCallbacks are callbacks that are triggered during certain
@@ -249,7 +252,11 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
249252
desc := le.config.Lock.Describe()
250253
klog.Infof("attempting to acquire leader lease %v...", desc)
251254
wait.JitterUntil(func() {
252-
succeeded = le.tryAcquireOrRenew(ctx)
255+
if !le.config.Coordinated {
256+
succeeded = le.tryAcquireOrRenew(ctx)
257+
} else {
258+
succeeded = le.tryCoordinatedRenew(ctx)
259+
}
253260
le.maybeReportTransition()
254261
if !succeeded {
255262
klog.V(4).Infof("failed to acquire lease %v", desc)
@@ -272,7 +279,11 @@ func (le *LeaderElector) renew(ctx context.Context) {
272279
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
273280
defer timeoutCancel()
274281
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
275-
return le.tryAcquireOrRenew(timeoutCtx), nil
282+
if !le.config.Coordinated {
283+
return le.tryAcquireOrRenew(timeoutCtx), nil
284+
} else {
285+
return le.tryCoordinatedRenew(timeoutCtx), nil
286+
}
276287
}, timeoutCtx.Done())
277288

278289
le.maybeReportTransition()
@@ -282,7 +293,6 @@ func (le *LeaderElector) renew(ctx context.Context) {
282293
return
283294
}
284295
le.metrics.leaderOff(le.config.Name)
285-
klog.Infof("failed to renew lease %v: %v", desc, err)
286296
cancel()
287297
}, le.config.RetryPeriod, ctx.Done())
288298

@@ -315,6 +325,81 @@ func (le *LeaderElector) release() bool {
315325
return true
316326
}
317327

328+
// tryCoordinatedRenew checks if it acquired a lease and tries to renew the
329+
// lease if it has already been acquired. Returns true on success else returns
330+
// false.
331+
func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
332+
now := metav1.NewTime(le.clock.Now())
333+
leaderElectionRecord := rl.LeaderElectionRecord{
334+
HolderIdentity: le.config.Lock.Identity(),
335+
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
336+
RenewTime: now,
337+
AcquireTime: now,
338+
}
339+
340+
// 1. obtain the electionRecord
341+
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
342+
if err != nil {
343+
if !errors.IsNotFound(err) {
344+
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
345+
return false
346+
}
347+
klog.Infof("lease lock not found: %v", le.config.Lock.Describe())
348+
return false
349+
}
350+
351+
// 2. Record obtained, check the Identity & Time
352+
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
353+
le.setObservedRecord(oldLeaderElectionRecord)
354+
355+
le.observedRawRecord = oldLeaderElectionRawRecord
356+
}
357+
hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time)
358+
359+
if hasExpired {
360+
klog.Infof("lock has expired: %v", le.config.Lock.Describe())
361+
return false
362+
}
363+
364+
if !le.IsLeader() {
365+
klog.V(4).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe())
366+
return false
367+
}
368+
369+
// 2b. If the lease has been marked as "end of term", don't renew it
370+
if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" {
371+
klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe())
372+
// TODO: Instead of letting lease expire, the holder may deleted it directly
373+
// This will not be compatible with all controllers, so it needs to be opt-in behavior..
374+
// We must ensure all code guarded by this lease has successfully completed
375+
// prior to releasing or there may be two processes
376+
// simultaneously acting on the critical path.
377+
// Usually once this returns false, the process is terminated..
378+
// xref: OnStoppedLeading
379+
return false
380+
}
381+
382+
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
383+
// here. Let's correct it before updating.
384+
if le.IsLeader() {
385+
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
386+
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
387+
leaderElectionRecord.Strategy = oldLeaderElectionRecord.Strategy
388+
le.metrics.slowpathExercised(le.config.Name)
389+
} else {
390+
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
391+
}
392+
393+
// update the lock itself
394+
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
395+
klog.Errorf("Failed to update lock: %v", err)
396+
return false
397+
}
398+
399+
le.setObservedRecord(&leaderElectionRecord)
400+
return true
401+
}
402+
318403
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
319404
// else it tries to renew the lease if it has already been acquired. Returns true
320405
// on success else returns false.

tools/leaderelection/leaderelection_test.go

+142-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/google/go-cmp/cmp"
28+
"github.com/stretchr/testify/assert"
2829
coordinationv1 "k8s.io/api/coordination/v1"
2930
corev1 "k8s.io/api/core/v1"
3031
"k8s.io/apimachinery/pkg/api/equality"
@@ -37,8 +38,6 @@ import (
3738
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
3839
"k8s.io/client-go/tools/record"
3940
"k8s.io/utils/clock"
40-
41-
"github.com/stretchr/testify/assert"
4241
)
4342

4443
func createLockObject(t *testing.T, objectType, namespace, name string, record *rl.LeaderElectionRecord) (obj runtime.Object) {
@@ -353,6 +352,147 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
353352
}
354353
}
355354

355+
func TestTryCoordinatedRenew(t *testing.T) {
356+
objectType := "leases"
357+
clock := clock.RealClock{}
358+
future := clock.Now().Add(1000 * time.Hour)
359+
360+
tests := []struct {
361+
name string
362+
observedRecord rl.LeaderElectionRecord
363+
observedTime time.Time
364+
retryAfter time.Duration
365+
reactors []Reactor
366+
expectedEvents []string
367+
368+
expectSuccess bool
369+
transitionLeader bool
370+
outHolder string
371+
}{
372+
{
373+
name: "don't acquire from led, acked object",
374+
reactors: []Reactor{
375+
{
376+
verb: "get",
377+
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
378+
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
379+
},
380+
},
381+
},
382+
observedTime: future,
383+
384+
expectSuccess: false,
385+
outHolder: "bing",
386+
},
387+
{
388+
name: "renew already acquired object",
389+
reactors: []Reactor{
390+
{
391+
verb: "get",
392+
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
393+
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
394+
},
395+
},
396+
{
397+
verb: "update",
398+
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
399+
return true, action.(fakeclient.CreateAction).GetObject(), nil
400+
},
401+
},
402+
},
403+
observedTime: future,
404+
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
405+
406+
expectSuccess: true,
407+
outHolder: "baz",
408+
},
409+
}
410+
411+
for i := range tests {
412+
test := &tests[i]
413+
t.Run(test.name, func(t *testing.T) {
414+
// OnNewLeader is called async so we have to wait for it.
415+
var wg sync.WaitGroup
416+
wg.Add(1)
417+
var reportedLeader string
418+
var lock rl.Interface
419+
420+
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
421+
recorder := record.NewFakeRecorder(100)
422+
resourceLockConfig := rl.ResourceLockConfig{
423+
Identity: "baz",
424+
EventRecorder: recorder,
425+
}
426+
c := &fake.Clientset{}
427+
for _, reactor := range test.reactors {
428+
c.AddReactor(reactor.verb, objectType, reactor.reaction)
429+
}
430+
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
431+
t.Errorf("unreachable action. testclient called too many times: %+v", action)
432+
return true, nil, fmt.Errorf("unreachable action")
433+
})
434+
435+
lock = &rl.LeaseLock{
436+
LeaseMeta: objectMeta,
437+
LockConfig: resourceLockConfig,
438+
Client: c.CoordinationV1(),
439+
}
440+
lec := LeaderElectionConfig{
441+
Lock: lock,
442+
LeaseDuration: 10 * time.Second,
443+
Callbacks: LeaderCallbacks{
444+
OnNewLeader: func(l string) {
445+
defer wg.Done()
446+
reportedLeader = l
447+
},
448+
},
449+
Coordinated: true,
450+
}
451+
observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
452+
le := &LeaderElector{
453+
config: lec,
454+
observedRecord: test.observedRecord,
455+
observedRawRecord: observedRawRecord,
456+
observedTime: test.observedTime,
457+
clock: clock,
458+
metrics: globalMetricsFactory.newLeaderMetrics(),
459+
}
460+
if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) {
461+
if test.retryAfter != 0 {
462+
time.Sleep(test.retryAfter)
463+
if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) {
464+
t.Errorf("unexpected result of tryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess)
465+
}
466+
} else {
467+
t.Errorf("unexpected result of gryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess)
468+
}
469+
}
470+
471+
le.observedRecord.AcquireTime = metav1.Time{}
472+
le.observedRecord.RenewTime = metav1.Time{}
473+
if le.observedRecord.HolderIdentity != test.outHolder {
474+
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
475+
}
476+
if len(test.reactors) != len(c.Actions()) {
477+
t.Errorf("wrong number of api interactions")
478+
}
479+
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
480+
t.Errorf("leader should have transitioned but did not")
481+
}
482+
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
483+
t.Errorf("leader should not have transitioned but did")
484+
}
485+
486+
le.maybeReportTransition()
487+
wg.Wait()
488+
if reportedLeader != test.outHolder {
489+
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
490+
}
491+
assertEqualEvents(t, test.expectedEvents, recorder.Events)
492+
})
493+
}
494+
}
495+
356496
// Will test leader election using lease as the resource
357497
func TestTryAcquireOrRenewLeases(t *testing.T) {
358498
testTryAcquireOrRenew(t, "leases")

0 commit comments

Comments
 (0)