Skip to content

Commit 87aeaac

Browse files
committed
configurable filter chains
Signed-off-by: Kuromesi <[email protected]>
1 parent 38cddf0 commit 87aeaac

File tree

9 files changed

+668
-41
lines changed

9 files changed

+668
-41
lines changed

pkg/ext-proc/backend/datastore.go

+14
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,24 @@ type K8sDatastore struct {
2929
inferencePool *v1alpha1.InferencePool
3030
InferenceModels *sync.Map
3131
pods *sync.Map
32+
33+
filterConfigMap *corev1.ConfigMap
3234
}
3335

3436
type K8sDatastoreOption func(*K8sDatastore)
3537

38+
func (ds *K8sDatastore) GetFilterConfigMap() *corev1.ConfigMap {
39+
ds.poolMu.RLock()
40+
defer ds.poolMu.RUnlock()
41+
return ds.filterConfigMap
42+
}
43+
44+
func WithFilterConfigMap(filterConfigMap *corev1.ConfigMap) K8sDatastoreOption {
45+
return func(store *K8sDatastore) {
46+
store.filterConfigMap = filterConfigMap
47+
}
48+
}
49+
3650
// WithPods can be used in tests to override the pods.
3751
func WithPods(pods []*PodMetrics) K8sDatastoreOption {
3852
return func(store *K8sDatastore) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
corev1 "k8s.io/api/core/v1"
7+
"k8s.io/klog/v2"
8+
ctrl "sigs.k8s.io/controller-runtime"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
10+
"sigs.k8s.io/controller-runtime/pkg/predicate"
11+
)
12+
13+
type FilterConfigReconciler struct {
14+
client.Client
15+
Datastore *K8sDatastore
16+
}
17+
18+
func (c *FilterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
19+
cm := &corev1.ConfigMap{}
20+
if err := c.Get(ctx, req.NamespacedName, cm); err != nil {
21+
if client.IgnoreNotFound(err) != nil {
22+
klog.Errorf("unable to get ConfigMap, err: %v", err)
23+
return ctrl.Result{}, err
24+
}
25+
c.Datastore.poolMu.Lock()
26+
defer c.Datastore.poolMu.Unlock()
27+
klog.V(1).Info("filter config deleted, reset filter config")
28+
c.Datastore.filterConfigMap = nil
29+
return ctrl.Result{}, nil
30+
}
31+
32+
c.Datastore.poolMu.Lock()
33+
defer c.Datastore.poolMu.Unlock()
34+
35+
if cm.DeletionTimestamp != nil {
36+
klog.V(1).Info("filter config deleting, reset filter config")
37+
c.Datastore.filterConfigMap = nil
38+
return ctrl.Result{}, nil
39+
}
40+
41+
klog.V(1).Infof("update filter config to: %++v", cm.Data)
42+
c.Datastore.filterConfigMap = cm.DeepCopy()
43+
return ctrl.Result{}, nil
44+
}
45+
46+
func (c *FilterConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
47+
return ctrl.NewControllerManagedBy(mgr).
48+
For(&corev1.ConfigMap{}).
49+
WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool {
50+
return object.GetName() == "filter-config" && object.GetNamespace() == "default"
51+
})).
52+
Complete(c)
53+
}

pkg/ext-proc/main.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ var (
6060
"refreshMetricsInterval",
6161
50*time.Millisecond,
6262
"interval to refresh metrics")
63+
enableFilterConfiguration = flag.Bool(
64+
"enableFilterConfiguration",
65+
false,
66+
"Whether to enable configuring filters in `default/filter-config` configmap, ONLY FOR DEV NOW.",
67+
)
6368

6469
scheme = runtime.NewScheme()
6570
)
@@ -146,6 +151,17 @@ func main() {
146151
klog.Error(err, "Error setting up EndpointSliceReconciler")
147152
}
148153

154+
var orchestrator *scheduling.FilterOrchestratorImpl
155+
if *enableFilterConfiguration {
156+
if err := (&backend.FilterConfigReconciler{
157+
Datastore: datastore,
158+
Client: mgr.GetClient(),
159+
}).SetupWithManager(mgr); err != nil {
160+
klog.Error(err, "Error setting up FilterConfigReconciler")
161+
}
162+
orchestrator = scheduling.NewFilterOrchestrator(datastore)
163+
}
164+
149165
errChan := make(chan error)
150166
go func() {
151167
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
@@ -164,7 +180,8 @@ func main() {
164180
s,
165181
handlers.NewServer(
166182
pp,
167-
scheduling.NewScheduler(pp),
183+
// when orchestrator is nil, default filter will be returned
184+
scheduling.NewScheduler(pp, scheduling.WithOrchestrator(orchestrator)),
168185
*targetPodHeader,
169186
datastore))
170187
healthPb.RegisterHealthServer(s, &healthServer{})

pkg/ext-proc/scheduling/filter.go

+22-13
Original file line numberDiff line numberDiff line change
@@ -4,43 +4,46 @@ import (
44
"errors"
55
"math"
66

7+
"google.golang.org/grpc/codes"
8+
"google.golang.org/grpc/status"
79
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
10+
811
klog "k8s.io/klog/v2"
912
)
1013

11-
type Filter interface {
14+
type FilterChain interface {
1215
Name() string
1316
Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)
1417
}
1518

16-
// filter applies current filterFunc, and then recursively applies next filters depending success or
19+
// filterChainImpl applies current filterFunc, and then recursively applies next filters depending success or
1720
// failure of the current filterFunc.
1821
// It can be used to construct a flow chart algorithm.
19-
type filter struct {
22+
type filterChainImpl struct {
2023
name string
21-
filter filterFunc
24+
filter filter
2225
// nextOnSuccess filter will be applied after successfully applying the current filter.
2326
// The filtered results will be passed to the next filter.
24-
nextOnSuccess *filter
27+
nextOnSuccess *filterChainImpl
2528
// nextOnFailure filter will be applied if current filter fails.
2629
// The original input will be passed to the next filter.
27-
nextOnFailure *filter
30+
nextOnFailure *filterChainImpl
2831
// nextOnSuccessOrFailure is a convenience field to configure the next filter regardless of the
2932
// success or failure of the current filter.
3033
// NOTE: When using nextOnSuccessOrFailure, both nextOnSuccess and nextOnFailure SHOULD be nil.
3134
// However if that's not the case, nextOnSuccess and nextOnFailure will be used, instead of
3235
// nextOnSuccessOrFailure, in the success and failure scenarios, respectively.
33-
nextOnSuccessOrFailure *filter
36+
nextOnSuccessOrFailure *filterChainImpl
3437
}
3538

36-
func (f *filter) Name() string {
39+
func (f *filterChainImpl) Name() string {
3740
if f == nil {
3841
return "nil"
3942
}
4043
return f.name
4144
}
4245

43-
func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
46+
func (f *filterChainImpl) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
4447
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods))
4548

4649
filtered, err := f.filter(req, pods)
@@ -71,11 +74,11 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend
7174
}
7275
}
7376

74-
// filterFunc filters a set of input pods to a subset.
75-
type filterFunc func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)
77+
// filter filters a set of input pods to a subset.
78+
type filter func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)
7679

77-
// toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc.
78-
func toFilterFunc(pp podPredicate) filterFunc {
80+
// toFilter is a helper function to convert a per pod filter func to the FilterFunc.
81+
func toFilter(pp podPredicate) filter {
7982
return func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
8083
filtered := []*backend.PodMetrics{}
8184
for _, pod := range pods {
@@ -152,6 +155,12 @@ func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*bac
152155
return filtered, nil
153156
}
154157

158+
func dropRequestFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
159+
klog.Infof("Dropping request %v", req)
160+
return []*backend.PodMetrics{}, status.Errorf(
161+
codes.ResourceExhausted, "dropping request due to limited backend resources")
162+
}
163+
155164
// podPredicate is a filter function to check whether a pod is desired.
156165
type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool
157166

pkg/ext-proc/scheduling/filter_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ func TestFilter(t *testing.T) {
1515
input []*backend.PodMetrics
1616
output []*backend.PodMetrics
1717
err bool
18-
filter *filter
18+
filter *filterChainImpl
1919
}{
2020
{
2121
name: "simple filter without successor, failure",
22-
filter: &filter{filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
22+
filter: &filterChainImpl{filter: func(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
2323
return nil, errors.New("filter error")
2424
}},
2525
err: true,
@@ -216,7 +216,7 @@ func TestFilter(t *testing.T) {
216216
func TestFilterFunc(t *testing.T) {
217217
tests := []struct {
218218
name string
219-
f filterFunc
219+
f filter
220220
req *LLMRequest
221221
input []*backend.PodMetrics
222222
output []*backend.PodMetrics
@@ -302,7 +302,7 @@ func TestFilterFunc(t *testing.T) {
302302
},
303303
{
304304
name: "noQueueAndLessThanKVCacheThresholdPredicate",
305-
f: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(0, 0.8)),
305+
f: toFilter(noQueueAndLessThanKVCacheThresholdPredicate(0, 0.8)),
306306
input: []*backend.PodMetrics{
307307
{
308308
// This pod should be returned.
@@ -337,7 +337,7 @@ func TestFilterFunc(t *testing.T) {
337337
},
338338
{
339339
name: "low LoRA cost",
340-
f: toFilterFunc(lowLoRACostPredicate),
340+
f: toFilter(lowLoRACostPredicate),
341341
req: &LLMRequest{
342342
Model: "model",
343343
ResolvedTargetModel: "model",

pkg/ext-proc/scheduling/filtergen.go

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package scheduling
2+
3+
const (
4+
FilterCriticalRequestName = "critical_request"
5+
FilterLeastQueuingName = "least_queuing"
6+
FilterLowCostLoraName = "low_cost_lora"
7+
FilterLowLatencyName = "low_latency"
8+
FilterAffinityLoraName = "affinity_lora"
9+
FilterSheddableRequestName = "sheddable_request"
10+
FilterLeastKvCacheName = "least_kv_cache"
11+
FilterDropRequestName = "drop_request"
12+
FilterCanAcceptNewLoraName = "can_accept_new_lora"
13+
)
14+
15+
const (
16+
TopKByWaitingQueueSize = "waiting_queue_size"
17+
TopKByKVCacheUsagePercent = "kv_cache_usage_percent"
18+
)
19+
20+
var filterMap = map[string]FilterGen{
21+
FilterLowLatencyName: FilterLowLatency,
22+
FilterCriticalRequestName: FilterCriticalRequest,
23+
FilterLeastQueuingName: FilterLeastQueuing,
24+
FilterCanAcceptNewLoraName: FilterCanAcceptNewLora,
25+
FilterSheddableRequestName: FilterSheddableRequest,
26+
FilterDropRequestName: FilterDropRequest,
27+
FilterAffinityLoraName: FilterAffinityLora,
28+
FilterLowCostLoraName: FilterLowCostLora,
29+
FilterLeastKvCacheName: FilterLeastKvCache,
30+
}
31+
32+
// FilterGen generate a filter from a filter option
33+
type FilterGen interface {
34+
Name() string
35+
Get(*FilterOption) filter
36+
Validate(*FilterOption) error
37+
}
38+
39+
type FilterOption struct {
40+
KvCacheThreshold *float64 `json:"kvCacheThreshold,omitempty"`
41+
42+
QueueThresholdCritical *int `json:"queueThresholdCritical,omitempty"`
43+
QueueingThresholdLoRA *int `json:"queueingThresholdLoRA,omitempty"`
44+
}
45+
46+
type filterGenImpl struct {
47+
name string
48+
getter func(*FilterOption) filter
49+
validator func(*FilterOption) error
50+
}
51+
52+
var _ FilterGen = &filterGenImpl{}
53+
54+
func (fg *filterGenImpl) Name() string {
55+
return fg.name
56+
}
57+
58+
func (fg *filterGenImpl) Get(fo *FilterOption) filter {
59+
return fg.getter(fo)
60+
}
61+
62+
func (fg *filterGenImpl) Validate(fo *FilterOption) error {
63+
return fg.validator(fo)
64+
}
65+
66+
var (
67+
FilterCriticalRequest FilterGen = &filterGenImpl{
68+
name: FilterCriticalRequestName,
69+
getter: func(fo *FilterOption) filter {
70+
return toFilter(criticalRequestPredicate)
71+
},
72+
validator: func(fo *FilterOption) error { return nil },
73+
}
74+
75+
FilterLeastQueuing FilterGen = &filterGenImpl{
76+
name: FilterLeastQueuingName,
77+
getter: func(fo *FilterOption) filter {
78+
return leastQueuingFilterFunc
79+
},
80+
validator: func(fo *FilterOption) error { return nil },
81+
}
82+
83+
FilterLowCostLora FilterGen = &filterGenImpl{
84+
name: FilterLowCostLoraName,
85+
getter: func(fo *FilterOption) filter {
86+
return toFilter(lowLoRACostPredicate)
87+
},
88+
validator: func(fo *FilterOption) error { return nil },
89+
}
90+
91+
FilterLowLatency FilterGen = &filterGenImpl{
92+
name: FilterLowLatencyName,
93+
getter: func(fo *FilterOption) filter {
94+
return toFilter(lowQueueingPodPredicate)
95+
},
96+
validator: func(fo *FilterOption) error { return nil },
97+
}
98+
99+
FilterAffinityLora FilterGen = &filterGenImpl{
100+
name: FilterAffinityLoraName,
101+
getter: func(fo *FilterOption) filter {
102+
return toFilter(loRAAffinityPredicate)
103+
},
104+
validator: func(fo *FilterOption) error { return nil },
105+
}
106+
107+
FilterSheddableRequest FilterGen = &filterGenImpl{
108+
name: FilterSheddableRequestName,
109+
getter: func(opt *FilterOption) filter {
110+
qtc, kct := queueThresholdCritical, kvCacheThreshold
111+
if opt != nil {
112+
if opt.KvCacheThreshold != nil {
113+
kct = *opt.KvCacheThreshold
114+
}
115+
if opt.QueueThresholdCritical != nil {
116+
qtc = *opt.QueueThresholdCritical
117+
}
118+
}
119+
return toFilter(noQueueAndLessThanKVCacheThresholdPredicate(qtc, kct))
120+
},
121+
validator: func(fo *FilterOption) error { return nil },
122+
}
123+
124+
FilterLeastKvCache FilterGen = &filterGenImpl{
125+
name: FilterLeastKvCacheName,
126+
getter: func(fo *FilterOption) filter {
127+
return leastKVCacheFilterFunc
128+
},
129+
validator: func(fo *FilterOption) error { return nil },
130+
}
131+
132+
FilterDropRequest FilterGen = &filterGenImpl{
133+
name: FilterDropRequestName,
134+
getter: func(fo *FilterOption) filter {
135+
return dropRequestFilterFunc
136+
},
137+
validator: func(fo *FilterOption) error { return nil },
138+
}
139+
140+
FilterCanAcceptNewLora FilterGen = &filterGenImpl{
141+
name: FilterCanAcceptNewLoraName,
142+
getter: func(fo *FilterOption) filter {
143+
return toFilter(canAcceptNewLoraPredicate)
144+
},
145+
validator: func(fo *FilterOption) error { return nil },
146+
}
147+
)

0 commit comments

Comments
 (0)