Skip to content

Commit 63b1b3e

Browse files
author
Mengqi Yu
committed
HA components
1 parent b9f30df commit 63b1b3e

File tree

3 files changed

+102
-8
lines changed

3 files changed

+102
-8
lines changed

pkg/manager/internal.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,11 @@ type controllerManager struct {
5050
// to scheme.scheme.
5151
scheme *runtime.Scheme
5252

53-
// runnables is the set of Controllers that the controllerManager injects deps into and Starts.
53+
// runnables is the set of Controllers and (or) webhook server that the controllerManager injects deps into and Starts.
5454
runnables []Runnable
55+
// haRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
56+
// These Runnables are in HA mode (not blocked by lead election)
57+
haRunnables []Runnable
5558

5659
cache cache.Cache
5760

@@ -104,7 +107,7 @@ type controllerManager struct {
104107
}
105108

106109
// Add sets dependencies on i, and adds it to the list of runnables to start.
107-
func (cm *controllerManager) Add(r Runnable) error {
110+
func (cm *controllerManager) add(r Runnable, leaderElection bool) error {
108111
cm.mu.Lock()
109112
defer cm.mu.Unlock()
110113

@@ -114,7 +117,11 @@ func (cm *controllerManager) Add(r Runnable) error {
114117
}
115118

116119
// Add the runnable to the list
117-
cm.runnables = append(cm.runnables, r)
120+
if leaderElection {
121+
cm.runnables = append(cm.runnables, r)
122+
} else {
123+
cm.haRunnables = append(cm.haRunnables, r)
124+
}
118125
if cm.started {
119126
// If already started, start the controller
120127
go func() {
@@ -125,6 +132,16 @@ func (cm *controllerManager) Add(r Runnable) error {
125132
return nil
126133
}
127134

135+
// Add sets dependencies on i, and adds it to the list of runnables to start.
136+
func (cm *controllerManager) Add(r Runnable) error {
137+
return cm.add(r, true)
138+
}
139+
140+
// AddHA sets dependencies on i, and adds it to the list of runnables to start in HA mode.
141+
func (cm *controllerManager) AddHA(r Runnable) error {
142+
return cm.add(r, false)
143+
}
144+
128145
func (cm *controllerManager) SetFields(i interface{}) error {
129146
if _, err := inject.ConfigInto(cm.config, i); err != nil {
130147
return err
@@ -235,13 +252,15 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
235252
go cm.serveMetrics(cm.internalStop)
236253
}
237254

255+
go cm.start()
256+
238257
if cm.resourceLock != nil {
239258
err := cm.startLeaderElection()
240259
if err != nil {
241260
return err
242261
}
243262
} else {
244-
go cm.start()
263+
go cm.startLeaderElected()
245264
}
246265

247266
select {
@@ -272,6 +291,24 @@ func (cm *controllerManager) start() {
272291
// TODO(community): Check the return value and write a test
273292
cm.cache.WaitForCacheSync(cm.internalStop)
274293

294+
// Start the HA runnables after the cache has synced
295+
for _, c := range cm.haRunnables {
296+
// Controllers block, but we want to return an error if any have an error starting.
297+
// Write any Start errors to a channel so we can return them
298+
ctrl := c
299+
go func() {
300+
cm.errChan <- ctrl.Start(cm.internalStop)
301+
}()
302+
}
303+
304+
cm.started = true
305+
}
306+
307+
func (cm *controllerManager) startLeaderElected() {
308+
// Wait for the caches to sync.
309+
// TODO(community): Check the return value and write a test
310+
cm.cache.WaitForCacheSync(cm.internalStop)
311+
275312
// Start the runnables after the cache has synced
276313
for _, c := range cm.runnables {
277314
// Controllers block, but we want to return an error if any have an error starting.
@@ -295,7 +332,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
295332
RetryPeriod: 2 * time.Second,
296333
Callbacks: leaderelection.LeaderCallbacks{
297334
OnStartedLeading: func(_ context.Context) {
298-
cm.start()
335+
cm.startLeaderElected()
299336
},
300337
OnStoppedLeading: func() {
301338
// Most implementations of leader election log.Fatal() here.

pkg/manager/manager.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,18 @@ import (
4242
// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
4343
// A Manager is required to create Controllers.
4444
type Manager interface {
45-
// Add will set reqeusted dependencies on the component, and cause the component to be
45+
// Add will set requested dependencies on the component, and cause the component to be
4646
// started when Start is called. Add will inject any dependencies for which the argument
47-
// implements the inject interface - e.g. inject.Client
47+
// implements the inject interface - e.g. inject.Client. Runnable added by calling `Add`
48+
// will be managed by leader election if the leader election is enabled.
4849
Add(Runnable) error
4950

51+
// AddHA will set reqeusted dependencies on the component, and cause the component to be
52+
// started when Start is called. Add will inject any dependencies for which the argument
53+
// implements the inject interface - e.g. inject.Client. Runnable added by calling `AddHA`
54+
// are NOT managed by leader election and all replicas will be running.
55+
AddHA(Runnable) error
56+
5057
// SetFields will set any dependencies on an object for which the object has implemented the inject
5158
// interface - e.g. inject.Client.
5259
SetFields(interface{}) error

pkg/manager/manager_test.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,28 @@ var _ = Describe("manger.Manager", func() {
218218
return nil
219219
}))
220220

221+
c3 := make(chan struct{})
222+
m.AddHA(RunnableFunc(func(s <-chan struct{}) error {
223+
defer close(c3)
224+
defer GinkgoRecover()
225+
return nil
226+
}))
227+
228+
c4 := make(chan struct{})
229+
m.Add(RunnableFunc(func(s <-chan struct{}) error {
230+
defer close(c4)
231+
defer GinkgoRecover()
232+
return nil
233+
}))
234+
221235
go func() {
222236
defer GinkgoRecover()
223237
Expect(m.Start(stop)).NotTo(HaveOccurred())
224238
}()
225239
<-c1
226240
<-c2
241+
<-c3
242+
<-c4
227243

228244
close(done)
229245
})
@@ -251,7 +267,7 @@ var _ = Describe("manger.Manager", func() {
251267
close(done)
252268
})
253269

254-
It("should return an error if any Components fail to Start", func(done Done) {
270+
It("should return an error if any Components that managed by leader election fail to Start", func(done Done) {
255271
m, err := New(cfg, options)
256272
Expect(err).NotTo(HaveOccurred())
257273
c1 := make(chan struct{})
@@ -284,6 +300,40 @@ var _ = Describe("manger.Manager", func() {
284300
<-c2
285301
<-c3
286302
})
303+
304+
It("should return an error if any Components fail to Start", func(done Done) {
305+
m, err := New(cfg, options)
306+
Expect(err).NotTo(HaveOccurred())
307+
c1 := make(chan struct{})
308+
m.AddHA(RunnableFunc(func(s <-chan struct{}) error {
309+
defer GinkgoRecover()
310+
defer close(c1)
311+
return nil
312+
}))
313+
314+
c2 := make(chan struct{})
315+
m.AddHA(RunnableFunc(func(s <-chan struct{}) error {
316+
defer GinkgoRecover()
317+
defer close(c2)
318+
return fmt.Errorf("expected error")
319+
}))
320+
321+
c3 := make(chan struct{})
322+
m.Add(RunnableFunc(func(s <-chan struct{}) error {
323+
defer GinkgoRecover()
324+
defer close(c3)
325+
return nil
326+
}))
327+
328+
go func() {
329+
defer GinkgoRecover()
330+
Expect(m.Start(stop)).NotTo(HaveOccurred())
331+
close(done)
332+
}()
333+
<-c1
334+
<-c2
335+
<-c3
336+
})
287337
}
288338

289339
Context("with defaults", func() {

0 commit comments

Comments
 (0)