Skip to content

Commit a7df8ba

Browse files
committed
✨ Take context when getting informer
Passing the context down to the `ip.InformersMap.Get()` makes it possible to handle scenarios where the cache takes too long to sync or will never sync.
1 parent 0374b8c commit a7df8ba

File tree

4 files changed

+33
-11
lines changed

4 files changed

+33
-11
lines changed

pkg/cache/cache_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,20 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
261261
err := informerCache.Get(context.Background(), svcKey, svc)
262262
Expect(err).To(HaveOccurred())
263263
})
264+
265+
It("should return an error when context is cancelled", func() {
266+
By("creating a context and cancelling it")
267+
ctx, cancel := context.WithCancel(context.Background())
268+
cancel()
269+
270+
By("listing pods in test-namespace-1 with a cancelled context")
271+
listObj := &kcorev1.PodList{}
272+
err := informerCache.List(ctx, listObj, client.InNamespace(testNamespaceOne))
273+
274+
By("verifying that an error is returned")
275+
Expect(err).To(HaveOccurred())
276+
Expect(errors.IsTimeout(err)).To(BeTrue())
277+
})
264278
})
265279
Context("with unstructured objects", func() {
266280
It("should be able to list objects that haven't been watched previously", func() {

pkg/cache/informer_cache.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
5757
return err
5858
}
5959

60-
started, cache, err := ip.InformersMap.Get(gvk, out)
60+
started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
6161
if err != nil {
6262
return err
6363
}
@@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
7676
return err
7777
}
7878

79-
started, cache, err := ip.InformersMap.Get(*gvk, cacheTypeObj)
79+
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
8080
if err != nil {
8181
return err
8282
}
@@ -128,7 +128,6 @@ func (ip *informerCache) objectTypeForListObject(list runtime.Object) (*schema.G
128128
}
129129

130130
return &gvk, cacheTypeObj, nil
131-
132131
}
133132

134133
// GetInformerForKind returns the informer for the GroupVersionKind
@@ -138,7 +137,10 @@ func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Inform
138137
if err != nil {
139138
return nil, err
140139
}
141-
_, i, err := ip.InformersMap.Get(gvk, obj)
140+
141+
// TODO(djzager): before a context can be passed down, the Informers interface
142+
// must be updated to accept a context when getting an informer
143+
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
142144
if err != nil {
143145
return nil, err
144146
}
@@ -151,7 +153,10 @@ func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
151153
if err != nil {
152154
return nil, err
153155
}
154-
_, i, err := ip.InformersMap.Get(gvk, obj)
156+
157+
// TODO(djzager): before a context can be passed down, the Informers interface
158+
// must be updated to accept a context when getting an informer
159+
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
155160
if err != nil {
156161
return nil, err
157162
}

pkg/cache/internal/deleg_map.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package internal
1818

1919
import (
20+
"context"
2021
"time"
2122

2223
"k8s.io/apimachinery/pkg/api/meta"
@@ -79,16 +80,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
7980

8081
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
8182
// the Informer from the map.
82-
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
83+
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
8384
_, isUnstructured := obj.(*unstructured.Unstructured)
8485
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
8586
isUnstructured = isUnstructured || isUnstructuredList
8687

8788
if isUnstructured {
88-
return m.unstructured.Get(gvk, obj)
89+
return m.unstructured.Get(ctx, gvk, obj)
8990
}
9091

91-
return m.structured.Get(gvk, obj)
92+
return m.structured.Get(ctx, gvk, obj)
9293
}
9394

9495
// newStructuredInformersMap creates a new InformersMap for structured objects.

pkg/cache/internal/informers_map.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ limitations under the License.
1717
package internal
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"math/rand"
2223
"sync"
2324
"time"
2425

26+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2527
"k8s.io/apimachinery/pkg/api/meta"
2628
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2729
"k8s.io/apimachinery/pkg/runtime"
@@ -163,7 +165,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
163165

164166
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
165167
// the Informer from the map.
166-
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
168+
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
167169
// Return the informer if it is found
168170
i, started, ok := func() (*MapEntry, bool, bool) {
169171
ip.mu.RLock()
@@ -181,8 +183,8 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
181183

182184
if started && !i.Informer.HasSynced() {
183185
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
184-
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
185-
return started, nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
186+
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
187+
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
186188
}
187189
}
188190

0 commit comments

Comments
 (0)