Skip to content

Commit fe0f6dd

Browse files
authored
Merge pull request #424 from mengqiy/nonleaderelection
support HA (non leader election) components
2 parents f39791e + ea5a354 commit fe0f6dd

File tree

5 files changed

+72
-29
lines changed

5 files changed

+72
-29
lines changed

examples/crd/pkg/doc.go renamed to examples/crd/pkg/groupversion_info.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,25 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17+
// +kubebuilder:object:generate=true
18+
// +groupName=chaosapps.metamagical.io
1719
package pkg
1820

1921
import (
22+
"k8s.io/apimachinery/pkg/runtime/schema"
2023
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
24+
"sigs.k8s.io/controller-runtime/pkg/scheme"
2125
)
2226

23-
var log = logf.Log.WithName("chaospod-resource")
27+
var (
28+
log = logf.Log.WithName("chaospod-resource")
29+
30+
// SchemeGroupVersion is group version used to register these objects
31+
SchemeGroupVersion = schema.GroupVersion{Group: "chaosapps.metamagical.io", Version: "v1"}
32+
33+
// SchemeBuilder is used to add go types to the GroupVersionKind scheme
34+
SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion}
35+
36+
// AddToScheme is required by pkg/client/...
37+
AddToScheme = SchemeBuilder.AddToScheme
38+
)

examples/crd/pkg/resource.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
corev1 "k8s.io/api/core/v1"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/apimachinery/pkg/runtime"
26-
"k8s.io/apimachinery/pkg/runtime/schema"
27-
"sigs.k8s.io/controller-runtime/pkg/scheme"
2826
"sigs.k8s.io/controller-runtime/pkg/webhook"
2927
)
3028

@@ -41,12 +39,11 @@ type ChaosPodStatus struct {
4139
LastRun metav1.Time `json:"lastRun,omitempty"`
4240
}
4341

44-
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
42+
// +kubebuilder:object:root=true
4543

4644
// ChaosPod is the Schema for the randomjobs API
4745
// +kubebuilder:printcolumn:name="next stop",type="string",JSONPath=".spec.nextStop",format="date"
4846
// +kubebuilder:printcolumn:name="last run",type="string",JSONPath=".status.lastRun",format="date"
49-
// +k8s:openapi-gen=true
5047
type ChaosPod struct {
5148
metav1.TypeMeta `json:",inline"`
5249
metav1.ObjectMeta `json:"metadata,omitempty"`
@@ -55,7 +52,7 @@ type ChaosPod struct {
5552
Status ChaosPodStatus `json:"status,omitempty"`
5653
}
5754

58-
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
55+
// +kubebuilder:object:root=true
5956

6057
// ChaosPodList contains a list of ChaosPod
6158
type ChaosPodList struct {
@@ -106,14 +103,3 @@ func (c *ChaosPod) Default() {
106103
func init() {
107104
SchemeBuilder.Register(&ChaosPod{}, &ChaosPodList{})
108105
}
109-
110-
var (
111-
// SchemeGroupVersion is group version used to register these objects
112-
SchemeGroupVersion = schema.GroupVersion{Group: "chaosapps.metamagical.io", Version: "v1"}
113-
114-
// SchemeBuilder is used to add go types to the GroupVersionKind scheme
115-
SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion}
116-
117-
// AddToScheme is required by pkg/client/...
118-
AddToScheme = SchemeBuilder.AddToScheme
119-
)

pkg/manager/internal.go

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,12 @@ type controllerManager struct {
5858
// to scheme.scheme.
5959
scheme *runtime.Scheme
6060

61-
// runnables is the set of Controllers that the controllerManager injects deps into and Starts.
62-
runnables []Runnable
61+
// leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts.
62+
// These Runnables are managed by lead election.
63+
leaderElectionRunnables []Runnable
64+
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
65+
// These Runnables will not be blocked by lead election.
66+
nonLeaderElectionRunnables []Runnable
6367

6468
cache cache.Cache
6569

@@ -121,7 +125,7 @@ type controllerManager struct {
121125
retryPeriod time.Duration
122126
}
123127

124-
// Add sets dependencies on i, and adds it to the list of runnables to start.
128+
// Add sets dependencies on i, and adds it to the list of Runnables to start.
125129
func (cm *controllerManager) Add(r Runnable) error {
126130
cm.mu.Lock()
127131
defer cm.mu.Unlock()
@@ -131,8 +135,13 @@ func (cm *controllerManager) Add(r Runnable) error {
131135
return err
132136
}
133137

134-
// Add the runnable to the list
135-
cm.runnables = append(cm.runnables, r)
138+
// Add the runnable to the leader election or the non-leaderelection list
139+
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
140+
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
141+
} else {
142+
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
143+
}
144+
136145
if cm.started {
137146
// If already started, start the controller
138147
go func() {
@@ -254,13 +263,15 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
254263
go cm.serveMetrics(cm.internalStop)
255264
}
256265

266+
go cm.startNonLeaderElectionRunnables()
267+
257268
if cm.resourceLock != nil {
258269
err := cm.startLeaderElection()
259270
if err != nil {
260271
return err
261272
}
262273
} else {
263-
go cm.start()
274+
go cm.startLeaderElectionRunnables()
264275
}
265276

266277
select {
@@ -273,7 +284,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
273284
}
274285
}
275286

276-
func (cm *controllerManager) start() {
287+
func (cm *controllerManager) startNonLeaderElectionRunnables() {
277288
cm.mu.Lock()
278289
defer cm.mu.Unlock()
279290

@@ -291,8 +302,26 @@ func (cm *controllerManager) start() {
291302
// TODO(community): Check the return value and write a test
292303
cm.cache.WaitForCacheSync(cm.internalStop)
293304

294-
// Start the runnables after the cache has synced
295-
for _, c := range cm.runnables {
305+
// Start the non-leaderelection Runnables after the cache has synced
306+
for _, c := range cm.nonLeaderElectionRunnables {
307+
// Controllers block, but we want to return an error if any have an error starting.
308+
// Write any Start errors to a channel so we can return them
309+
ctrl := c
310+
go func() {
311+
cm.errChan <- ctrl.Start(cm.internalStop)
312+
}()
313+
}
314+
315+
cm.started = true
316+
}
317+
318+
func (cm *controllerManager) startLeaderElectionRunnables() {
319+
// Wait for the caches to sync.
320+
// TODO(community): Check the return value and write a test
321+
cm.cache.WaitForCacheSync(cm.internalStop)
322+
323+
// Start the leader election Runnables after the cache has synced
324+
for _, c := range cm.leaderElectionRunnables {
296325
// Controllers block, but we want to return an error if any have an error starting.
297326
// Write any Start errors to a channel so we can return them
298327
ctrl := c
@@ -312,7 +341,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
312341
RetryPeriod: cm.retryPeriod,
313342
Callbacks: leaderelection.LeaderCallbacks{
314343
OnStartedLeading: func(_ context.Context) {
315-
cm.start()
344+
cm.startLeaderElectionRunnables()
316345
},
317346
OnStoppedLeading: func() {
318347
// Most implementations of leader election log.Fatal() here.

pkg/manager/manager.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ import (
4242
// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
4343
// A Manager is required to create Controllers.
4444
type Manager interface {
45-
// Add will set reqeusted dependencies on the component, and cause the component to be
45+
// Add will set requested dependencies on the component, and cause the component to be
4646
// started when Start is called. Add will inject any dependencies for which the argument
47-
// implements the inject interface - e.g. inject.Client
47+
// implements the inject interface - e.g. inject.Client.
48+
// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
49+
// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
4850
Add(Runnable) error
4951

5052
// SetFields will set any dependencies on an object for which the object has implemented the inject
@@ -183,6 +185,13 @@ func (r RunnableFunc) Start(s <-chan struct{}) error {
183185
return r(s)
184186
}
185187

188+
// LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode.
189+
type LeaderElectionRunnable interface {
190+
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode.
191+
// e.g. controllers need to be run in leader election mode, while webhook server doesn't.
192+
NeedLeaderElection() bool
193+
}
194+
186195
// New returns a new Manager for creating Controllers.
187196
func New(config *rest.Config, options Options) (Manager, error) {
188197
// Initialize a rest.config if none was specified

pkg/manager/manager_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,10 @@ var _ = Describe("manger.Manager", func() {
284284
<-c2
285285
<-c3
286286
})
287+
288+
It("should return an error if any non-leaderelection Components fail to Start", func() {
289+
// TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429
290+
})
287291
}
288292

289293
Context("with defaults", func() {

0 commit comments

Comments
 (0)