Skip to content

Commit 0f7927c

Browse files
authored
Merge pull request #3179 from alvaroaleman/lowpdefault
[release-0.20] 🌱 Handlers: Default to LowPriorityWhenUnchanged without a wrapper
2 parents 3156ace + 9951869 commit 0f7927c

File tree

6 files changed

+387
-168
lines changed

6 files changed

+387
-168
lines changed

pkg/builder/controller.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (blder *TypedBuilder[request]) Watches(
163163
) *TypedBuilder[request] {
164164
input := WatchesInput[request]{
165165
obj: object,
166-
handler: handler.WithLowPriorityWhenUnchanged(eventHandler),
166+
handler: eventHandler,
167167
}
168168
for _, opt := range opts {
169169
opt.ApplyToWatches(&input)
@@ -317,7 +317,7 @@ func (blder *TypedBuilder[request]) doWatch() error {
317317
}
318318

319319
var hdler handler.TypedEventHandler[client.Object, request]
320-
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})))
320+
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
321321
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
322322
allPredicates = append(allPredicates, blder.forInput.predicates...)
323323
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
@@ -341,11 +341,11 @@ func (blder *TypedBuilder[request]) doWatch() error {
341341
}
342342

343343
var hdler handler.TypedEventHandler[client.Object, request]
344-
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner(
344+
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
345345
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
346346
blder.forInput.object,
347347
opts...,
348-
))))
348+
)))
349349
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
350350
allPredicates = append(allPredicates, own.predicates...)
351351
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)

pkg/handler/enqueue.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5252
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5353
return
5454
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
55+
56+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5657
Name: evt.Object.GetName(),
5758
Namespace: evt.Object.GetNamespace(),
58-
}})
59+
}}
60+
61+
addToQueueCreate(q, evt, item)
5962
}
6063

6164
// Update implements EventHandler.
6265
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6366
switch {
6467
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
68+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6669
Name: evt.ObjectNew.GetName(),
6770
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
71+
}}
72+
73+
addToQueueUpdate(q, evt, item)
6974
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
75+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7176
Name: evt.ObjectOld.GetName(),
7277
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
78+
}}
79+
80+
addToQueueUpdate(q, evt, item)
7481
default:
7582
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
7683
}

pkg/handler/enqueue_mapped.go

+36-9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"k8s.io/client-go/util/workqueue"
2323
"sigs.k8s.io/controller-runtime/pkg/client"
24+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/event"
2526
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2627
)
@@ -63,15 +64,17 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
6364
// TypedEnqueueRequestsFromMapFunc is experimental and subject to future change.
6465
func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request] {
6566
return &enqueueRequestsFromMapFunc[object, request]{
66-
toRequests: fn,
67+
toRequests: fn,
68+
objectImplementsClientObject: implementsClientObject[object](),
6769
}
6870
}
6971

7072
var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object, reconcile.Request]{}
7173

7274
type enqueueRequestsFromMapFunc[object any, request comparable] struct {
7375
// Mapper transforms the argument into a slice of keys to be reconciled
74-
toRequests TypedMapFunc[object, request]
76+
toRequests TypedMapFunc[object, request]
77+
objectImplementsClientObject bool
7578
}
7679

7780
// Create implements EventHandler.
@@ -81,7 +84,15 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
8184
q workqueue.TypedRateLimitingInterface[request],
8285
) {
8386
reqs := map[request]empty{}
84-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
87+
88+
var lowPriority bool
89+
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) {
90+
clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)}
91+
if isObjectUnchanged(clientObjectEvent) {
92+
lowPriority = true
93+
}
94+
}
95+
e.mapAndEnqueue(ctx, q, evt.Object, reqs, lowPriority)
8596
}
8697

8798
// Update implements EventHandler.
@@ -90,9 +101,13 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
90101
evt event.TypedUpdateEvent[object],
91102
q workqueue.TypedRateLimitingInterface[request],
92103
) {
104+
var lowPriority bool
105+
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) {
106+
lowPriority = any(evt.ObjectOld).(client.Object).GetResourceVersion() == any(evt.ObjectNew).(client.Object).GetResourceVersion()
107+
}
93108
reqs := map[request]empty{}
94-
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
95-
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
109+
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs, lowPriority)
110+
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs, lowPriority)
96111
}
97112

98113
// Delete implements EventHandler.
@@ -102,7 +117,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Delete(
102117
q workqueue.TypedRateLimitingInterface[request],
103118
) {
104119
reqs := map[request]empty{}
105-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
120+
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
106121
}
107122

108123
// Generic implements EventHandler.
@@ -112,14 +127,26 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Generic(
112127
q workqueue.TypedRateLimitingInterface[request],
113128
) {
114129
reqs := map[request]empty{}
115-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
130+
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
116131
}
117132

118-
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) {
133+
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
134+
ctx context.Context,
135+
q workqueue.TypedRateLimitingInterface[request],
136+
o object,
137+
reqs map[request]empty,
138+
lowPriority bool,
139+
) {
119140
for _, req := range e.toRequests(ctx, o) {
120141
_, ok := reqs[req]
121142
if !ok {
122-
q.Add(req)
143+
if lowPriority {
144+
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
145+
Priority: LowPriority,
146+
}, req)
147+
} else {
148+
q.Add(req)
149+
}
123150
reqs[req] = empty{}
124151
}
125152
}

pkg/handler/enqueue_owner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, m
7272
for _, opt := range opts {
7373
opt(e)
7474
}
75-
return e
75+
return WithLowPriorityWhenUnchanged(e)
7676
}
7777

7878
// OnlyControllerOwner if provided will only look at the first OwnerReference with Controller: true.

pkg/handler/eventhandler.go

+94-38
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package handler
1818

1919
import (
2020
"context"
21+
"reflect"
2122
"time"
2223

2324
"k8s.io/client-go/util/workqueue"
@@ -108,10 +109,46 @@ type TypedFuncs[object any, request comparable] struct {
108109
GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
109110
}
110111

112+
var typeForClientObject = reflect.TypeFor[client.Object]()
113+
114+
func implementsClientObject[object any]() bool {
115+
return reflect.TypeFor[object]().Implements(typeForClientObject)
116+
}
117+
118+
func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[request]) bool {
119+
_, ok := q.(priorityqueue.PriorityQueue[request])
120+
return ok
121+
}
122+
111123
// Create implements EventHandler.
112124
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
113125
if h.CreateFunc != nil {
114-
h.CreateFunc(ctx, e, q)
126+
if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.Object) {
127+
h.CreateFunc(ctx, e, q)
128+
return
129+
}
130+
wq := workqueueWithCustomAddFunc[request]{
131+
TypedRateLimitingInterface: q,
132+
// We already know that we have a priority queue, that event.Object implements
133+
// client.Object and that its not nil
134+
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
135+
// We construct a new event typed to client.Object because isObjectUnchanged
136+
// is a generic and hence has to know at compile time the type of the event
137+
// it gets. We only figure that out at runtime though, but we know for sure
138+
// that it implements client.Object at this point so we can hardcode the event
139+
// type to that.
140+
evt := event.CreateEvent{Object: any(e.Object).(client.Object)}
141+
var priority int
142+
if isObjectUnchanged(evt) {
143+
priority = LowPriority
144+
}
145+
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
146+
priorityqueue.AddOpts{Priority: priority},
147+
item,
148+
)
149+
},
150+
}
151+
h.CreateFunc(ctx, e, wq)
115152
}
116153
}
117154

@@ -125,7 +162,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
125162
// Update implements EventHandler.
126163
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
127164
if h.UpdateFunc != nil {
128-
h.UpdateFunc(ctx, e, q)
165+
if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) {
166+
h.UpdateFunc(ctx, e, q)
167+
return
168+
}
169+
170+
wq := workqueueWithCustomAddFunc[request]{
171+
TypedRateLimitingInterface: q,
172+
// We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
173+
// client.Object and that they are not nil
174+
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
175+
var priority int
176+
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
177+
priority = LowPriority
178+
}
179+
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
180+
priorityqueue.AddOpts{Priority: priority},
181+
item,
182+
)
183+
},
184+
}
185+
h.UpdateFunc(ctx, e, wq)
129186
}
130187
}
131188

@@ -142,43 +199,10 @@ const LowPriority = -100
142199
// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if
143200
// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing.
144201
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] {
202+
// TypedFuncs already implements this so just wrap
145203
return TypedFuncs[object, request]{
146-
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
147-
// Due to how the handlers are factored, we have to wrap the workqueue to be able
148-
// to inject custom behavior.
149-
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
150-
TypedRateLimitingInterface: trli,
151-
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
152-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
153-
if !isPriorityQueue {
154-
q.Add(item)
155-
return
156-
}
157-
var priority int
158-
if isObjectUnchanged(tce) {
159-
priority = LowPriority
160-
}
161-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
162-
},
163-
})
164-
},
165-
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
166-
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
167-
TypedRateLimitingInterface: trli,
168-
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
169-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
170-
if !isPriorityQueue {
171-
q.Add(item)
172-
return
173-
}
174-
var priority int
175-
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
176-
priority = LowPriority
177-
}
178-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
179-
},
180-
})
181-
},
204+
CreateFunc: u.Create,
205+
UpdateFunc: u.Update,
182206
DeleteFunc: u.Delete,
183207
GenericFunc: u.Generic,
184208
}
@@ -199,3 +223,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199223
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200224
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201225
}
226+
227+
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
228+
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
229+
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
230+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
231+
if !isPriorityQueue {
232+
q.Add(item)
233+
return
234+
}
235+
236+
var priority int
237+
if isObjectUnchanged(evt) {
238+
priority = LowPriority
239+
}
240+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
241+
}
242+
243+
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
244+
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
245+
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
246+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
247+
if !isPriorityQueue {
248+
q.Add(item)
249+
return
250+
}
251+
252+
var priority int
253+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
254+
priority = LowPriority
255+
}
256+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
257+
}

0 commit comments

Comments
 (0)