Skip to content

Commit 5028a59

Browse files
authored
Merge pull request #2121 from inteon/informer_map_context
✨Use correct context to cancel "list and watch" & wait for all informers to complete
2 parents 8499b67 + 45e7aa7 commit 5028a59

File tree

1 file changed

+48
-28
lines changed

1 file changed

+48
-28
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ type InformersMap struct {
117117
// paramCodec is used by list and watch
118118
paramCodec runtime.ParameterCodec
119119

120-
// stop is the stop channel to stop informers
121-
stop <-chan struct{}
122-
123120
// resync is the base frequency the informers are resynced
124121
// a 10 percent jitter will be added to the resync period between informers
125122
// so that all informers will not send list requests simultaneously.
@@ -128,13 +125,22 @@ type InformersMap struct {
128125
// mu guards access to the map
129126
mu sync.RWMutex
130127

131-
// start is true if the informers have been started
128+
// started is true if the informers have been started
132129
started bool
133130

134131
// startWait is a channel that is closed after the
135132
// informer has been started.
136133
startWait chan struct{}
137134

135+
// waitGroup is the wait group that is used to wait for all informers to stop
136+
waitGroup sync.WaitGroup
137+
138+
// stopped is true if the informers have been stopped
139+
stopped bool
140+
141+
// ctx is the context to stop informers
142+
ctx context.Context
143+
138144
// namespace is the namespace that all ListWatches are restricted to
139145
// default or empty string means all namespaces
140146
namespace string
@@ -157,28 +163,47 @@ func (ip *InformersMap) Start(ctx context.Context) error {
157163
ip.mu.Lock()
158164
defer ip.mu.Unlock()
159165

160-
// Set the stop channel so it can be passed to informers that are added later
161-
ip.stop = ctx.Done()
166+
// Set the context so it can be passed to informers that are added later
167+
ip.ctx = ctx
162168

163169
// Start each informer
164170
for _, i := range ip.informers.Structured {
165-
go i.Informer.Run(ctx.Done())
171+
ip.startInformerLocked(i.Informer)
166172
}
167173
for _, i := range ip.informers.Unstructured {
168-
go i.Informer.Run(ctx.Done())
174+
ip.startInformerLocked(i.Informer)
169175
}
170176
for _, i := range ip.informers.Metadata {
171-
go i.Informer.Run(ctx.Done())
177+
ip.startInformerLocked(i.Informer)
172178
}
173179

174180
// Set started to true so we immediately start any informers added later.
175181
ip.started = true
176182
close(ip.startWait)
177183
}()
178-
<-ctx.Done()
184+
<-ctx.Done() // Block until the context is done
185+
ip.mu.Lock()
186+
ip.stopped = true // Set stopped to true so we don't start any new informers
187+
ip.mu.Unlock()
188+
ip.waitGroup.Wait() // Block until all informers have stopped
179189
return nil
180190
}
181191

192+
func (ip *InformersMap) startInformerLocked(informer cache.SharedIndexInformer) {
193+
// Don't start the informer in case we are already waiting for the items in
194+
// the waitGroup to finish, since waitGroups don't support waiting and adding
195+
// at the same time.
196+
if ip.stopped {
197+
return
198+
}
199+
200+
ip.waitGroup.Add(1)
201+
go func() {
202+
defer ip.waitGroup.Done()
203+
informer.Run(ip.ctx.Done())
204+
}()
205+
}
206+
182207
func (ip *InformersMap) waitForStarted(ctx context.Context) bool {
183208
select {
184209
case <-ip.startWait:
@@ -307,20 +332,15 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
307332
}
308333
ip.informersByType(obj)[gvk] = i
309334

310-
// Start the Informer if need by
311-
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
312-
// can you add eventhandlers?
335+
// Start the informer in case the InformersMap has started, otherwise it will be
336+
// started when the InformersMap starts.
313337
if ip.started {
314-
go i.Informer.Run(ip.stop)
338+
ip.startInformerLocked(i.Informer)
315339
}
316340
return i, ip.started, nil
317341
}
318342

319343
func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) {
320-
// TODO(vincepri): Wire the context in here and don't use TODO().
321-
// Can we use the context from the Get call?
322-
ctx := context.TODO()
323-
324344
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
325345
// groupVersionKind to the Resource API we will use.
326346
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
@@ -351,16 +371,16 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
351371
return &cache.ListWatch{
352372
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
353373
if namespace != "" {
354-
return resources.Namespace(namespace).List(ctx, opts)
374+
return resources.Namespace(namespace).List(ip.ctx, opts)
355375
}
356-
return resources.List(ctx, opts)
376+
return resources.List(ip.ctx, opts)
357377
},
358378
// Setup the watch function
359379
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
360380
if namespace != "" {
361-
return resources.Namespace(namespace).Watch(ctx, opts)
381+
return resources.Namespace(namespace).Watch(ip.ctx, opts)
362382
}
363-
return resources.Watch(ctx, opts)
383+
return resources.Watch(ip.ctx, opts)
364384
},
365385
}, nil
366386
//
@@ -386,9 +406,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
386406
err error
387407
)
388408
if namespace != "" {
389-
list, err = resources.Namespace(namespace).List(ctx, opts)
409+
list, err = resources.Namespace(namespace).List(ip.ctx, opts)
390410
} else {
391-
list, err = resources.List(ctx, opts)
411+
list, err = resources.List(ip.ctx, opts)
392412
}
393413
if list != nil {
394414
for i := range list.Items {
@@ -400,9 +420,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
400420
// Setup the watch function
401421
WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) {
402422
if namespace != "" {
403-
watcher, err = resources.Namespace(namespace).Watch(ctx, opts)
423+
watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts)
404424
} else {
405-
watcher, err = resources.Watch(ctx, opts)
425+
watcher, err = resources.Watch(ip.ctx, opts)
406426
}
407427
if err != nil {
408428
return nil, err
@@ -433,7 +453,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
433453

434454
// Create the resulting object, and execute the request.
435455
res := listObj.DeepCopyObject()
436-
if err := req.Do(ctx).Into(res); err != nil {
456+
if err := req.Do(ip.ctx).Into(res); err != nil {
437457
return nil, err
438458
}
439459
return res, nil
@@ -446,7 +466,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
446466
req.Namespace(namespace)
447467
}
448468
// Call the watch.
449-
return req.Watch(ctx)
469+
return req.Watch(ip.ctx)
450470
},
451471
}, nil
452472
}

0 commit comments

Comments
 (0)