Skip to content

Commit 9bd981b

Browse files
authored
Use server side namespace filter (#429)
* Add filter to pod redconciler * use poolHasSynced * filter using server-side namespace filter * update object filter * update * remove unused scheme and namespace * Move controller manager build function to pkg/epp/server so we can better test it * Update integration test
1 parent 48978f4 commit 9bd981b

8 files changed

+81
-33
lines changed

cmd/epp/main.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,12 @@ import (
3030
"go.uber.org/zap/zapcore"
3131
"google.golang.org/grpc"
3232
healthPb "google.golang.org/grpc/health/grpc_health_v1"
33-
"k8s.io/apimachinery/pkg/runtime"
34-
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
35-
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3633
"k8s.io/client-go/rest"
3734
"k8s.io/component-base/metrics/legacyregistry"
3835
ctrl "sigs.k8s.io/controller-runtime"
3936
"sigs.k8s.io/controller-runtime/pkg/log/zap"
4037
"sigs.k8s.io/controller-runtime/pkg/manager"
4138
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
42-
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
4339
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4440
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
4541
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/vllm"
@@ -97,15 +93,9 @@ var (
9793
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
9894
"then a self-signed certificate is used.")
9995

100-
scheme = runtime.NewScheme()
10196
setupLog = ctrl.Log.WithName("setup")
10297
)
10398

104-
func init() {
105-
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
106-
utilruntime.Must(v1alpha2.AddToScheme(scheme))
107-
}
108-
10999
func main() {
110100
if err := run(); err != nil {
111101
os.Exit(1)
@@ -140,9 +130,9 @@ func run() error {
140130
return err
141131
}
142132

143-
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme})
133+
mgr, err := runserver.NewDefaultManager(*poolNamespace, *poolName, cfg)
144134
if err != nil {
145-
setupLog.Error(err, "Failed to create controller manager", "config", cfg)
135+
setupLog.Error(err, "Failed to create controller manager")
146136
return err
147137
}
148138

pkg/epp/controller/inferencemodel_reconciler.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"fmt"
2222

2323
"k8s.io/apimachinery/pkg/api/errors"
24-
"k8s.io/apimachinery/pkg/runtime"
2524
"k8s.io/apimachinery/pkg/types"
2625
"k8s.io/client-go/tools/record"
2726
ctrl "sigs.k8s.io/controller-runtime"
@@ -36,7 +35,6 @@ import (
3635

3736
type InferenceModelReconciler struct {
3837
client.Client
39-
Scheme *runtime.Scheme
4038
Record record.EventRecorder
4139
Datastore datastore.Datastore
4240
PoolNamespacedName types.NamespacedName
@@ -128,5 +126,5 @@ func (c *InferenceModelReconciler) SetupWithManager(ctx context.Context, mgr ctr
128126
}
129127

130128
func (c *InferenceModelReconciler) eventPredicate(infModel *v1alpha2.InferenceModel) bool {
131-
return (infModel.Spec.PoolRef.Name == v1alpha2.ObjectName(c.PoolNamespacedName.Name)) && (infModel.GetNamespace() == c.PoolNamespacedName.Namespace)
129+
return string(infModel.Spec.PoolRef.Name) == c.PoolNamespacedName.Name
132130
}

pkg/epp/controller/inferencemodel_reconciler_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ func TestInferenceModelReconciler(t *testing.T) {
193193
datastore := datastore.NewFakeDatastore(nil, test.modelsInStore, pool)
194194
reconciler := &InferenceModelReconciler{
195195
Client: fakeClient,
196-
Scheme: scheme,
197196
Record: record.NewFakeRecorder(10),
198197
Datastore: datastore,
199198
PoolNamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace},

pkg/epp/controller/inferencepool_reconciler.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ import (
2121
"reflect"
2222

2323
"k8s.io/apimachinery/pkg/api/errors"
24-
"k8s.io/apimachinery/pkg/runtime"
2524
"k8s.io/apimachinery/pkg/types"
2625
"k8s.io/client-go/tools/record"
2726
ctrl "sigs.k8s.io/controller-runtime"
2827
"sigs.k8s.io/controller-runtime/pkg/client"
2928
"sigs.k8s.io/controller-runtime/pkg/log"
30-
"sigs.k8s.io/controller-runtime/pkg/predicate"
3129
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3230
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3331
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -38,7 +36,6 @@ import (
3836
// will have the proper controller that will create/manage objects on behalf of the server pool.
3937
type InferencePoolReconciler struct {
4038
client.Client
41-
Scheme *runtime.Scheme
4239
Record record.EventRecorder
4340
PoolNamespacedName types.NamespacedName
4441
Datastore datastore.Datastore
@@ -90,8 +87,5 @@ func (c *InferencePoolReconciler) updateDatastore(ctx context.Context, newPool *
9087
func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
9188
return ctrl.NewControllerManagedBy(mgr).
9289
For(&v1alpha2.InferencePool{}).
93-
WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool {
94-
return (object.GetNamespace() == c.PoolNamespacedName.Namespace) && (object.GetName() == c.PoolNamespacedName.Name)
95-
})).
9690
Complete(c)
9791
}

pkg/epp/controller/pod_reconciler.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/go-logr/logr"
2323
corev1 "k8s.io/api/core/v1"
2424
apierrors "k8s.io/apimachinery/pkg/api/errors"
25-
"k8s.io/apimachinery/pkg/runtime"
2625
"k8s.io/apimachinery/pkg/types"
2726
"k8s.io/client-go/tools/record"
2827
ctrl "sigs.k8s.io/controller-runtime"
@@ -35,19 +34,15 @@ import (
3534
type PodReconciler struct {
3635
client.Client
3736
Datastore datastore.Datastore
38-
Scheme *runtime.Scheme
3937
Record record.EventRecorder
4038
}
4139

4240
func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
4341
logger := log.FromContext(ctx)
44-
inferencePool, err := c.Datastore.PoolGet()
45-
if err != nil {
46-
logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the InferencePool is not available yet", "error", err)
42+
if !c.Datastore.PoolHasSynced() {
43+
logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the InferencePool is not available yet")
4744
// When the inferencePool is initialized it lists the appropriate pods and populates the datastore, so no need to requeue.
4845
return ctrl.Result{}, nil
49-
} else if inferencePool.Namespace != req.Namespace {
50-
return ctrl.Result{}, nil
5146
}
5247

5348
logger.V(logutil.VERBOSE).Info("Pod being reconciled", "name", req.NamespacedName)

pkg/epp/server/controller_manager.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package server
18+
19+
import (
20+
"fmt"
21+
22+
corev1 "k8s.io/api/core/v1"
23+
"k8s.io/apimachinery/pkg/fields"
24+
"k8s.io/apimachinery/pkg/runtime"
25+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
27+
"k8s.io/client-go/rest"
28+
ctrl "sigs.k8s.io/controller-runtime"
29+
"sigs.k8s.io/controller-runtime/pkg/cache"
30+
"sigs.k8s.io/controller-runtime/pkg/client"
31+
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
32+
)
33+
34+
var scheme = runtime.NewScheme()
35+
36+
func init() {
37+
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
38+
utilruntime.Must(v1alpha2.AddToScheme(scheme))
39+
}
40+
41+
// NewDefaultManager creates a new controller manager with default configuration.
42+
func NewDefaultManager(namespace, name string, restConfig *rest.Config) (ctrl.Manager, error) {
43+
manager, err := ctrl.NewManager(restConfig, ctrl.Options{
44+
Scheme: scheme,
45+
Cache: cache.Options{
46+
ByObject: map[client.Object]cache.ByObject{
47+
&corev1.Pod{}: {
48+
Namespaces: map[string]cache.Config{
49+
namespace: {},
50+
},
51+
},
52+
&v1alpha2.InferencePool{}: {
53+
Namespaces: map[string]cache.Config{
54+
namespace: {
55+
FieldSelector: fields.SelectorFromSet(fields.Set{
56+
"metadata.name": name,
57+
}),
58+
},
59+
},
60+
},
61+
&v1alpha2.InferenceModel{}: {
62+
Namespaces: map[string]cache.Config{
63+
namespace: {},
64+
},
65+
},
66+
},
67+
},
68+
})
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to create controller manager: %v", err)
71+
}
72+
return manager, nil
73+
}

pkg/epp/server/runserver.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man
8989
// Create the controllers and register them with the manager
9090
if err := (&controller.InferencePoolReconciler{
9191
Datastore: r.Datastore,
92-
Scheme: mgr.GetScheme(),
9392
Client: mgr.GetClient(),
9493
PoolNamespacedName: types.NamespacedName{
9594
Name: r.PoolName,
@@ -102,7 +101,6 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man
102101

103102
if err := (&controller.InferenceModelReconciler{
104103
Datastore: r.Datastore,
105-
Scheme: mgr.GetScheme(),
106104
Client: mgr.GetClient(),
107105
PoolNamespacedName: types.NamespacedName{
108106
Name: r.PoolName,
@@ -115,7 +113,6 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man
115113

116114
if err := (&controller.PodReconciler{
117115
Datastore: r.Datastore,
118-
Scheme: mgr.GetScheme(),
119116
Client: mgr.GetClient(),
120117
Record: mgr.GetEventRecorderFor("pod"),
121118
}).SetupWithManager(mgr); err != nil {

test/integration/hermetic_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import (
5858
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
5959
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
6060
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
61+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
6162
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
6263
extprocutils "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/test"
6364
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -491,7 +492,8 @@ func BeforeSuit(t *testing.T) func() {
491492

492493
// Init runtime.
493494
ctrl.SetLogger(logger)
494-
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme})
495+
496+
mgr, err := server.NewDefaultManager("default", "vllm-llama2-7b-pool", cfg)
495497
if err != nil {
496498
logutil.Fatal(logger, err, "Failed to create controller manager")
497499
}

0 commit comments

Comments
 (0)