Skip to content

Commit e0b187f

Browse files
authored
Merge pull request #830 from djzager/ctx-informers
⚠ update Informers interface to accept context
2 parents 71edbc4 + b6d18c7 commit e0b187f

File tree

9 files changed

+90
-32
lines changed

9 files changed

+90
-32
lines changed

pkg/cache/cache.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package cache
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"time"
2223

@@ -51,11 +52,11 @@ type Cache interface {
5152
type Informers interface {
5253
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
5354
// API kind and resource.
54-
GetInformer(obj runtime.Object) (Informer, error)
55+
GetInformer(ctx context.Context, obj runtime.Object) (Informer, error)
5556

5657
// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
5758
// of the underlying object.
58-
GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error)
59+
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
5960

6061
// Start runs all the informers known to this cache until the given channel is closed.
6162
// It blocks.

pkg/cache/cache_test.go

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
496496
},
497497
},
498498
}
499-
sii, err := informerCache.GetInformer(pod)
499+
sii, err := informerCache.GetInformer(context.TODO(), pod)
500500
Expect(err).NotTo(HaveOccurred())
501501
Expect(sii).NotTo(BeNil())
502502
Expect(sii.HasSynced()).To(BeTrue())
@@ -522,7 +522,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
522522
It("should be able to get an informer by group/version/kind", func(done Done) {
523523
By("getting an shared index informer for gvk = core/v1/pod")
524524
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
525-
sii, err := informerCache.GetInformerForKind(gvk)
525+
sii, err := informerCache.GetInformerForKind(context.TODO(), gvk)
526526
Expect(err).NotTo(HaveOccurred())
527527
Expect(sii).NotTo(BeNil())
528528
Expect(sii.HasSynced()).To(BeTrue())
@@ -569,7 +569,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
569569
indexFunc := func(obj runtime.Object) []string {
570570
return []string{string(obj.(*kcorev1.Pod).Spec.RestartPolicy)}
571571
}
572-
Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed())
572+
Expect(informer.IndexField(context.TODO(), pod, "spec.restartPolicy", indexFunc)).To(Succeed())
573573

574574
By("running the cache and waiting for it to sync")
575575
go func() {
@@ -588,6 +588,45 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
588588
actual := listObj.Items[0]
589589
Expect(actual.Name).To(Equal("test-pod-3"))
590590
})
591+
592+
It("should allow for get informer to be cancelled", func() {
593+
By("creating a context and cancelling it")
594+
ctx, cancel := context.WithCancel(context.Background())
595+
cancel()
596+
597+
By("getting a shared index informer for a pod with a cancelled context")
598+
pod := &kcorev1.Pod{
599+
ObjectMeta: kmetav1.ObjectMeta{
600+
Name: "informer-obj",
601+
Namespace: "default",
602+
},
603+
Spec: kcorev1.PodSpec{
604+
Containers: []kcorev1.Container{
605+
{
606+
Name: "nginx",
607+
Image: "nginx",
608+
},
609+
},
610+
},
611+
}
612+
sii, err := informerCache.GetInformer(ctx, pod)
613+
Expect(err).To(HaveOccurred())
614+
Expect(sii).To(BeNil())
615+
Expect(errors.IsTimeout(err)).To(BeTrue())
616+
})
617+
618+
It("should allow getting an informer by group/version/kind to be cancelled", func() {
619+
By("creating a context and cancelling it")
620+
ctx, cancel := context.WithCancel(context.Background())
621+
cancel()
622+
623+
By("getting an shared index informer for gvk = core/v1/pod with a cancelled context")
624+
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
625+
sii, err := informerCache.GetInformerForKind(ctx, gvk)
626+
Expect(err).To(HaveOccurred())
627+
Expect(sii).To(BeNil())
628+
Expect(errors.IsTimeout(err)).To(BeTrue())
629+
})
591630
})
592631
Context("with unstructured objects", func() {
593632
It("should be able to get informer for the object", func(done Done) {
@@ -612,7 +651,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
612651
Version: "v1",
613652
Kind: "Pod",
614653
})
615-
sii, err := informerCache.GetInformer(pod)
654+
sii, err := informerCache.GetInformer(context.TODO(), pod)
616655
Expect(err).NotTo(HaveOccurred())
617656
Expect(sii).NotTo(BeNil())
618657
Expect(sii.HasSynced()).To(BeTrue())
@@ -658,7 +697,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
658697
}
659698
return []string{fmt.Sprintf("%v", m["restartPolicy"])}
660699
}
661-
Expect(informer.IndexField(pod, "spec.restartPolicy", indexFunc)).To(Succeed())
700+
Expect(informer.IndexField(context.TODO(), pod, "spec.restartPolicy", indexFunc)).To(Succeed())
662701

663702
By("running the cache and waiting for it to sync")
664703
go func() {
@@ -684,6 +723,26 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
684723
actual := listObj.Items[0]
685724
Expect(actual.GetName()).To(Equal("test-pod-3"))
686725
}, 3)
726+
727+
It("should allow for get informer to be cancelled", func() {
728+
By("creating a context and cancelling it")
729+
ctx, cancel := context.WithCancel(context.Background())
730+
cancel()
731+
732+
By("getting a shared index informer for a pod with a cancelled context")
733+
pod := &unstructured.Unstructured{}
734+
pod.SetName("informer-obj2")
735+
pod.SetNamespace("default")
736+
pod.SetGroupVersionKind(schema.GroupVersionKind{
737+
Group: "",
738+
Version: "v1",
739+
Kind: "Pod",
740+
})
741+
sii, err := informerCache.GetInformer(ctx, pod)
742+
Expect(err).To(HaveOccurred())
743+
Expect(sii).To(BeNil())
744+
Expect(errors.IsTimeout(err)).To(BeTrue())
745+
})
687746
})
688747
})
689748
})

pkg/cache/informer_cache.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,32 +131,28 @@ func (ip *informerCache) objectTypeForListObject(list runtime.Object) (*schema.G
131131
}
132132

133133
// GetInformerForKind returns the informer for the GroupVersionKind
134-
func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
134+
func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
135135
// Map the gvk to an object
136136
obj, err := ip.Scheme.New(gvk)
137137
if err != nil {
138138
return nil, err
139139
}
140140

141-
// TODO(djzager): before a context can be passed down, the Informers interface
142-
// must be updated to accept a context when getting an informer
143-
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
141+
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
144142
if err != nil {
145143
return nil, err
146144
}
147145
return i.Informer, err
148146
}
149147

150148
// GetInformer returns the informer for the obj
151-
func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
149+
func (ip *informerCache) GetInformer(ctx context.Context, obj runtime.Object) (Informer, error) {
152150
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
153151
if err != nil {
154152
return nil, err
155153
}
156154

157-
// TODO(djzager): before a context can be passed down, the Informers interface
158-
// must be updated to accept a context when getting an informer
159-
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
155+
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
160156
if err != nil {
161157
return nil, err
162158
}
@@ -174,8 +170,8 @@ func (ip *informerCache) NeedLeaderElection() bool {
174170
// to List. For one-to-one compatibility with "normal" field selectors, only return one value.
175171
// The values may be anything. They will automatically be prefixed with the namespace of the
176172
// given object, if present. The objects passed are guaranteed to be objects of the correct type.
177-
func (ip *informerCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
178-
informer, err := ip.GetInformer(obj)
173+
func (ip *informerCache) IndexField(ctx context.Context, obj runtime.Object, field string, extractValue client.IndexerFunc) error {
174+
informer, err := ip.GetInformer(ctx, obj)
179175
if err != nil {
180176
return err
181177
}

pkg/cache/informertest/fake_cache.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type FakeInformers struct {
3939
}
4040

4141
// GetInformerForKind implements Informers
42-
func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) {
42+
func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
4343
if c.Scheme == nil {
4444
c.Scheme = scheme.Scheme
4545
}
@@ -51,7 +51,7 @@ func (c *FakeInformers) GetInformerForKind(gvk schema.GroupVersionKind) (cache.I
5151
}
5252

5353
// FakeInformerForKind implements Informers
54-
func (c *FakeInformers) FakeInformerForKind(gvk schema.GroupVersionKind) (*controllertest.FakeInformer, error) {
54+
func (c *FakeInformers) FakeInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (*controllertest.FakeInformer, error) {
5555
if c.Scheme == nil {
5656
c.Scheme = scheme.Scheme
5757
}
@@ -67,7 +67,7 @@ func (c *FakeInformers) FakeInformerForKind(gvk schema.GroupVersionKind) (*contr
6767
}
6868

6969
// GetInformer implements Informers
70-
func (c *FakeInformers) GetInformer(obj runtime.Object) (cache.Informer, error) {
70+
func (c *FakeInformers) GetInformer(ctx context.Context, obj runtime.Object) (cache.Informer, error) {
7171
if c.Scheme == nil {
7272
c.Scheme = scheme.Scheme
7373
}
@@ -126,7 +126,7 @@ func (c *FakeInformers) Start(stopCh <-chan struct{}) error {
126126
}
127127

128128
// IndexField implements Cache
129-
func (c *FakeInformers) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
129+
func (c *FakeInformers) IndexField(ctx context.Context, obj runtime.Object, field string, extractValue client.IndexerFunc) error {
130130
return nil
131131
}
132132

pkg/cache/multi_namespace_cache.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ type multiNamespaceCache struct {
7070
var _ Cache = &multiNamespaceCache{}
7171

7272
// Methods for multiNamespaceCache to conform to the Informers interface
73-
func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error) {
73+
func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj runtime.Object) (Informer, error) {
7474
informers := map[string]Informer{}
7575
for ns, cache := range c.namespaceToCache {
76-
informer, err := cache.GetInformer(obj)
76+
informer, err := cache.GetInformer(ctx, obj)
7777
if err != nil {
7878
return nil, err
7979
}
@@ -82,10 +82,10 @@ func (c *multiNamespaceCache) GetInformer(obj runtime.Object) (Informer, error)
8282
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
8383
}
8484

85-
func (c *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (Informer, error) {
85+
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
8686
informers := map[string]Informer{}
8787
for ns, cache := range c.namespaceToCache {
88-
informer, err := cache.GetInformerForKind(gvk)
88+
informer, err := cache.GetInformerForKind(ctx, gvk)
8989
if err != nil {
9090
return nil, err
9191
}
@@ -117,9 +117,9 @@ func (c *multiNamespaceCache) WaitForCacheSync(stop <-chan struct{}) bool {
117117
return synced
118118
}
119119

120-
func (c *multiNamespaceCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
120+
func (c *multiNamespaceCache) IndexField(ctx context.Context, obj runtime.Object, field string, extractValue client.IndexerFunc) error {
121121
for _, cache := range c.namespaceToCache {
122-
if err := cache.IndexField(obj, field, extractValue); err != nil {
122+
if err := cache.IndexField(ctx, obj, field, extractValue); err != nil {
123123
return err
124124
}
125125
}

pkg/client/example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func ExampleClient_deleteAllOf() {
216216
// This example shows how to set up and consume a field selector over a pod's volumes' secretName field.
217217
func ExampleFieldIndexer_secretName() {
218218
// someIndexer is a FieldIndexer over a Cache
219-
_ = someIndexer.IndexField(&corev1.Pod{}, "spec.volumes.secret.secretName", func(o runtime.Object) []string {
219+
_ = someIndexer.IndexField(context.TODO(), &corev1.Pod{}, "spec.volumes.secret.secretName", func(o runtime.Object) []string {
220220
var res []string
221221
for _, vol := range o.(*corev1.Pod).Spec.Volumes {
222222
if vol.Secret == nil {

pkg/client/interfaces.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ type FieldIndexer interface {
122122
// and "equality" in the field selector means that at least one key matches the value.
123123
// The FieldIndexer will automatically take care of indexing over namespace
124124
// and supporting efficient all-namespace queries.
125-
IndexField(obj runtime.Object, field string, extractValue IndexerFunc) error
125+
IndexField(ctx context.Context, obj runtime.Object, field string, extractValue IndexerFunc) error
126126
}
127127

128128
// IgnoreNotFound returns nil on NotFound errors.

pkg/internal/controller/controller_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package controller
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"sync"
2223
"time"
@@ -106,9 +107,9 @@ var _ = Describe("controller", func() {
106107

107108
c, err := cache.New(cfg, cache.Options{})
108109
Expect(err).NotTo(HaveOccurred())
109-
_, err = c.GetInformer(&appsv1.Deployment{})
110+
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
110111
Expect(err).NotTo(HaveOccurred())
111-
_, err = c.GetInformer(&appsv1.ReplicaSet{})
112+
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
112113
Expect(err).NotTo(HaveOccurred())
113114
ctrl.Cache = c
114115
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return true }

pkg/source/source.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package source
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"sync"
2223

@@ -98,7 +99,7 @@ func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimiting
9899
}
99100

100101
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
101-
i, err := ks.cache.GetInformer(ks.Type)
102+
i, err := ks.cache.GetInformer(context.TODO(), ks.Type)
102103
if err != nil {
103104
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
104105
log.Error(err, "if kind is a CRD, it should be installed before calling Start",

0 commit comments

Comments
 (0)