Skip to content

Commit 48fe9c7

Browse files
committed
Add FieldSelectorByResource option to cache
All instance for a same resources are being cached by controller-runtime, for some use cases this consumes a lot of memory and CPU. This change add a option to the cache so resources can be selected by field and label. Signed-off-by: Quique Llorente <[email protected]>
1 parent 10ae090 commit 48fe9c7

File tree

5 files changed

+153
-11
lines changed

5 files changed

+153
-11
lines changed

pkg/cache/cache.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ type Informer interface {
8686
HasSynced() bool
8787
}
8888

89+
// SelectorsByGroupResource associate a GroupResource to a field/label selector
90+
type SelectorsByGroupResource internal.SelectorsByGroupResource
91+
8992
// Options are the optional arguments for creating a new InformersMap object
9093
type Options struct {
9194
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
@@ -103,6 +106,13 @@ type Options struct {
103106
// Namespace restricts the cache's ListWatch to the desired namespace
104107
// Default watches all namespaces
105108
Namespace string
109+
110+
// FieldSelectorByResource restricts the cache's ListWatch to the desired
111+
// fields per resource, the map's value must implement Selector [1] using
112+
// for example a Set [2]
113+
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
114+
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
115+
SelectorByResource SelectorsByGroupResource
106116
}
107117

108118
var defaultResyncTime = 10 * time.Hour
@@ -113,10 +123,20 @@ func New(config *rest.Config, opts Options) (Cache, error) {
113123
if err != nil {
114124
return nil, err
115125
}
116-
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
126+
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, internal.SelectorsByGroupResource(opts.SelectorByResource))
117127
return &informerCache{InformersMap: im}, nil
118128
}
119129

130+
// BuilderWithSelectors returns a Cache constructor that will filter
131+
// contents using fieldSelectorByResource
132+
// WARNING: filtered out resources are not returned.
133+
func BuilderWithSelectors(selectors SelectorsByGroupResource) NewCacheFunc {
134+
return func(config *rest.Config, opts Options) (Cache, error) {
135+
opts.SelectorByResource = selectors
136+
return New(config, opts)
137+
}
138+
}
139+
120140
func defaultOpts(config *rest.Config, opts Options) (Options, error) {
121141
// Use the default Kubernetes Scheme if unset
122142
if opts.Scheme == nil {

pkg/cache/cache_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/apimachinery/pkg/api/errors"
2727
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29+
"k8s.io/apimachinery/pkg/fields"
2930
"k8s.io/apimachinery/pkg/runtime/schema"
3031
kscheme "k8s.io/client-go/kubernetes/scheme"
3132
"k8s.io/client-go/rest"
@@ -789,7 +790,81 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
789790
Eventually(out).Should(Receive(Equal(pod)))
790791
close(done)
791792
})
793+
It("should be able to filter informers at list watch level by field", func() {
794+
By("creating the cache")
795+
builder := cache.BuilderWithSelectors(
796+
cache.SelectorsByGroupResource{
797+
{Group: "", Resource: "pods"}: {
798+
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "foo"}),
799+
},
800+
},
801+
)
802+
informer, err := builder(cfg, cache.Options{})
803+
Expect(err).NotTo(HaveOccurred())
804+
805+
By("running the cache and waiting for it to sync")
806+
go func() {
807+
defer GinkgoRecover()
808+
Expect(informer.Start(informerCacheCtx)).To(Succeed())
809+
}()
810+
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())
811+
812+
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
813+
sii, err := informer.GetInformerForKind(context.TODO(), gvk)
814+
Expect(err).NotTo(HaveOccurred())
815+
Expect(sii).NotTo(BeNil())
816+
Expect(sii.HasSynced()).To(BeTrue())
792817

818+
By("adding an event handler listening for object creation which sends the object to a channel")
819+
out := make(chan interface{})
820+
addFunc := func(obj interface{}) {
821+
out <- obj
822+
}
823+
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
824+
825+
By("adding a pair of objects")
826+
cl, err := client.New(cfg, client.Options{})
827+
Expect(err).NotTo(HaveOccurred())
828+
podFoo := &kcorev1.Pod{
829+
ObjectMeta: kmetav1.ObjectMeta{
830+
Name: "foo",
831+
Namespace: "default",
832+
},
833+
Spec: kcorev1.PodSpec{
834+
Containers: []kcorev1.Container{
835+
{
836+
Name: "nginx",
837+
Image: "nginx",
838+
},
839+
},
840+
},
841+
}
842+
Expect(cl.Create(context.Background(), podFoo)).To(Succeed())
843+
defer deletePod(podFoo)
844+
845+
podBar := &kcorev1.Pod{
846+
ObjectMeta: kmetav1.ObjectMeta{
847+
Name: "bar",
848+
Namespace: "default",
849+
},
850+
Spec: kcorev1.PodSpec{
851+
Containers: []kcorev1.Container{
852+
{
853+
Name: "nginx",
854+
Image: "nginx",
855+
},
856+
},
857+
},
858+
}
859+
Expect(cl.Create(context.Background(), podBar)).To(Succeed())
860+
defer deletePod(podBar)
861+
862+
By("verifying the filter out object is not received on the channel")
863+
var obtainedObj interface{}
864+
Expect(out).Should(Receive(&obtainedObj), "should receive something")
865+
Expect(obtainedObj).Should(Equal(podFoo), "should receive the pod 'foo'")
866+
Consistently(out).ShouldNot(Receive(), "should not receive anything else")
867+
})
793868
It("should be able to index an object field then retrieve objects by that field", func() {
794869
By("creating the cache")
795870
informer, err := cache.New(cfg, cache.Options{})

pkg/cache/internal/deleg_map.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config,
4949
scheme *runtime.Scheme,
5050
mapper meta.RESTMapper,
5151
resync time.Duration,
52-
namespace string) *InformersMap {
52+
namespace string,
53+
selectors SelectorsByGroupResource,
54+
) *InformersMap {
5355

5456
return &InformersMap{
55-
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
56-
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
57-
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
57+
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
58+
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
59+
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors),
5860

5961
Scheme: scheme,
6062
}
@@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
105107
}
106108

107109
// newStructuredInformersMap creates a new InformersMap for structured objects.
108-
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
109-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
110+
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
111+
namespace string, selectors SelectorsByGroupResource) *specificInformersMap {
112+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch)
110113
}
111114

112115
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
113-
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
114-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
116+
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
117+
namespace string, selectors SelectorsByGroupResource) *specificInformersMap {
118+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch)
115119
}
116120

117121
// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
118-
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
119-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
122+
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
123+
namespace string, selectors SelectorsByGroupResource) *specificInformersMap {
124+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch)
120125
}

pkg/cache/internal/informers_map.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func newSpecificInformersMap(config *rest.Config,
4848
mapper meta.RESTMapper,
4949
resync time.Duration,
5050
namespace string,
51+
selectors SelectorsByGroupResource,
5152
createListWatcher createListWatcherFunc) *specificInformersMap {
5253
ip := &specificInformersMap{
5354
config: config,
@@ -60,6 +61,7 @@ func newSpecificInformersMap(config *rest.Config,
6061
startWait: make(chan struct{}),
6162
createListWatcher: createListWatcher,
6263
namespace: namespace,
64+
selectors: selectors,
6365
}
6466
return ip
6567
}
@@ -120,6 +122,8 @@ type specificInformersMap struct {
120122
// namespace is the namespace that all ListWatches are restricted to
121123
// default or empty string means all namespaces
122124
namespace string
125+
126+
selectors SelectorsByGroupResource
123127
}
124128

125129
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
@@ -256,13 +260,15 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
256260
// Create a new ListWatch for the obj
257261
return &cache.ListWatch{
258262
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
263+
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
259264
res := listObj.DeepCopyObject()
260265
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
261266
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
262267
return res, err
263268
},
264269
// Setup the watch function
265270
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
271+
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
266272
// Watch needs to be set to true separately
267273
opts.Watch = true
268274
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
@@ -289,13 +295,15 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
289295
// Create a new ListWatch for the obj
290296
return &cache.ListWatch{
291297
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
298+
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
292299
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
293300
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
294301
}
295302
return dynamicClient.Resource(mapping.Resource).List(ctx, opts)
296303
},
297304
// Setup the watch function
298305
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
306+
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
299307
// Watch needs to be set to true separately
300308
opts.Watch = true
301309
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
@@ -327,13 +335,15 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
327335
// create the relevant listwatch
328336
return &cache.ListWatch{
329337
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
338+
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
330339
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
331340
return client.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
332341
}
333342
return client.Resource(mapping.Resource).List(ctx, opts)
334343
},
335344
// Setup the watch function
336345
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
346+
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
337347
// Watch needs to be set to true separately
338348
opts.Watch = true
339349
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {

pkg/cache/internal/selector.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package internal
2+
3+
import (
4+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
5+
"k8s.io/apimachinery/pkg/fields"
6+
"k8s.io/apimachinery/pkg/labels"
7+
"k8s.io/apimachinery/pkg/runtime/schema"
8+
)
9+
10+
// SelectorsByGroupResource associate a GroupResource to a field/label selector
11+
type SelectorsByGroupResource map[schema.GroupResource]Selector
12+
13+
// Selector specify the label/field selector to fill in ListOptions
14+
type Selector struct {
15+
Label labels.Selector
16+
Field fields.Selector
17+
}
18+
19+
// FillInListOpts fill in ListOptions LabelSelector and FieldSelector if needed
20+
func (s Selector) FillInListOpts(listOpts *metav1.ListOptions) {
21+
if s.Label != nil {
22+
listOpts.LabelSelector = s.Label.String()
23+
}
24+
if s.Field != nil {
25+
listOpts.FieldSelector = s.Field.String()
26+
}
27+
}
28+
29+
// FindByGR use the GVR group and resource to find the field/label selector
30+
func (s SelectorsByGroupResource) FindByGR(gvr schema.GroupVersionResource) Selector {
31+
return s[schema.GroupResource{Group: gvr.Group, Resource: gvr.Resource}]
32+
}

0 commit comments

Comments
 (0)