Skip to content

Commit 8770afe

Browse files
authored
Scheduler config refactor for simplifying plugins registration (#835)
* small refactor of scheduler config handles how to register a plugin that implements multiple scheduler plugins interfaces with a single registration command Signed-off-by: Nir Rozenbaum <[email protected]> * code review Signed-off-by: Nir Rozenbaum <[email protected]> * minor change Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 70285f1 commit 8770afe

File tree

9 files changed

+219
-109
lines changed

9 files changed

+219
-109
lines changed

cmd/epp/main.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import (
4444
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4545
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
47-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
4847
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
4948
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/multi/prefix"
5049
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
@@ -196,23 +195,21 @@ func run() error {
196195
if schedulerV2 == "true" {
197196
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
198197
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
199-
scorers := map[plugins.Scorer]int{
200-
&scorer.QueueScorer{}: queueScorerWeight,
201-
&scorer.KVCacheScorer{}: kvCacheScorerWeight,
202-
}
203-
schedConfigOpts := []scheduling.ConfigOption{}
198+
199+
schedulerConfig := scheduling.NewSchedulerConfig().
200+
WithFilters(filter.NewSheddableCapacityFilter()).
201+
WithScorers(scorer.NewWeightedScorer(&scorer.QueueScorer{}, queueScorerWeight),
202+
scorer.NewWeightedScorer(&scorer.KVCacheScorer{}, kvCacheScorerWeight)).
203+
WithPicker(picker.NewMaxScorePicker())
204+
204205
if prefixCacheScheduling == "true" {
205206
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
206-
schedConfigOpts = append(schedConfigOpts, scheduling.AddPrefixPlugin(loadPrefixCacheConfig(), prefixScorerWeight))
207+
if err := schedulerConfig.AddPlugins(scorer.NewWeightedScorer(prefix.New(loadPrefixCacheConfig()), prefixScorerWeight)); err != nil {
208+
setupLog.Error(err, "Failed to register scheduler plugins")
209+
return err
210+
}
207211
}
208-
schedulerConfig := scheduling.NewSchedulerConfig(
209-
[]plugins.PreSchedule{},
210-
[]plugins.Filter{filter.NewSheddableCapacityFilter()},
211-
scorers,
212-
picker.NewMaxScorePicker(),
213-
[]plugins.PostSchedule{},
214-
[]plugins.PostResponse{},
215-
schedConfigOpts...)
212+
216213
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
217214
}
218215
serverRunner := &runserver.ExtProcServerRunner{

pkg/epp/scheduling/config.go

Lines changed: 0 additions & 62 deletions
This file was deleted.

pkg/epp/scheduling/plugins/scorer/kvcache.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,27 @@ limitations under the License.
1717
package scorer
1818

1919
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
2021
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2122
)
2223

2324
const (
2425
DefaultKVCacheScorerWeight = 1
2526
)
2627

28+
// compile-time type validation
29+
var _ plugins.Scorer = &KVCacheScorer{}
30+
31+
// KVCacheScorer scores list of candidate pods based on KV cache utilization.
2732
type KVCacheScorer struct{}
2833

29-
func (ss *KVCacheScorer) Name() string {
34+
// Name returns the name of the scorer.
35+
func (s *KVCacheScorer) Name() string {
3036
return "kv-cache"
3137
}
3238

33-
func (ss *KVCacheScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
39+
// Score returns the scoring result for the given list of pods based on context.
40+
func (s *KVCacheScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
3441
scores := make(map[types.Pod]float64, len(pods))
3542
for _, pod := range pods {
3643
scores[pod] = 1 - pod.GetMetrics().KVCacheUsagePercent

pkg/epp/scheduling/plugins/scorer/queue.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,28 @@ package scorer
1919
import (
2020
"math"
2121

22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
2223
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2324
)
2425

2526
const (
2627
DefaultQueueScorerWeight = 1
2728
)
2829

30+
// compile-time type validation
31+
var _ plugins.Scorer = &QueueScorer{}
32+
33+
// QueueScorer scores list of candidate pods based on the pod's waiting queue size.
34+
// the less waiting queue size the pod has, the higher score it will get (since it's more available to serve new request).
2935
type QueueScorer struct{}
3036

31-
func (q *QueueScorer) Name() string {
37+
// Name returns the name of the scorer.
38+
func (s *QueueScorer) Name() string {
3239
return "queue"
3340
}
3441

35-
func (q *QueueScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
42+
// Score returns the scoring result for the given list of pods based on context.
43+
func (s *QueueScorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
3644
minQueueSize := math.MaxInt
3745
maxQueueSize := math.MinInt
3846

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scorer
18+
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
21+
)
22+
23+
// NewWeightedScorer initializes a new WeightedScorer and returns its pointer.
24+
func NewWeightedScorer(scorer plugins.Scorer, weight int) *WeightedScorer {
25+
return &WeightedScorer{
26+
Scorer: scorer,
27+
weight: weight,
28+
}
29+
}
30+
31+
// WeightedScorer is a struct that encapsulates a scorer with its weight.
32+
type WeightedScorer struct {
33+
plugins.Scorer
34+
weight int
35+
}
36+
37+
// Weight returns the weight of the scorer.
38+
func (s *WeightedScorer) Weight() int {
39+
return s.weight
40+
}

pkg/epp/scheduling/scheduler.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
2929
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
3030
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
31+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
3132
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3233
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
3334
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -64,13 +65,9 @@ func NewScheduler(datastore Datastore) *Scheduler {
6465
},
6566
}
6667

67-
defaultConfig := &SchedulerConfig{
68-
preSchedulePlugins: []plugins.PreSchedule{},
69-
filters: []plugins.Filter{filter.NewSheddableCapacityFilter(), lowLatencyFilter},
70-
scorers: map[plugins.Scorer]int{},
71-
picker: &picker.RandomPicker{},
72-
postSchedulePlugins: []plugins.PostSchedule{},
73-
}
68+
defaultConfig := NewSchedulerConfig().
69+
WithFilters(filter.NewSheddableCapacityFilter(), lowLatencyFilter).
70+
WithPicker(&picker.RandomPicker{})
7471

7572
return NewSchedulerWithConfig(datastore, defaultConfig)
7673
}
@@ -92,7 +89,7 @@ type Scheduler struct {
9289
datastore Datastore
9390
preSchedulePlugins []plugins.PreSchedule
9491
filters []plugins.Filter
95-
scorers map[plugins.Scorer]int // map from scorer to its weight
92+
scorers []*scorer.WeightedScorer
9693
picker plugins.Picker
9794
postSchedulePlugins []plugins.PostSchedule
9895
postResponsePlugins []plugins.PostResponse
@@ -172,15 +169,15 @@ func (s *Scheduler) runScorerPlugins(ctx *types.SchedulingContext, pods []types.
172169
weightedScorePerPod[pod] = float64(0) // initialize weighted score per pod with 0 value
173170
}
174171
// Iterate through each scorer in the chain and accumulate the weighted scores.
175-
for scorer, weight := range s.scorers {
176-
loggerDebug.Info("Running scorer", "scorer", scorer.Name())
172+
for _, weightedScorer := range s.scorers {
173+
loggerDebug.Info("Running scorer", "scorer", weightedScorer.Name())
177174
before := time.Now()
178-
scores := scorer.Score(ctx, pods)
179-
metrics.RecordSchedulerPluginProcessingLatency(plugins.ScorerPluginType, scorer.Name(), time.Since(before))
175+
scores := weightedScorer.Score(ctx, pods)
176+
metrics.RecordSchedulerPluginProcessingLatency(plugins.ScorerPluginType, weightedScorer.Name(), time.Since(before))
180177
for pod, score := range scores { // weight is relative to the sum of weights
181-
weightedScorePerPod[pod] += score * float64(weight) // TODO normalize score before multiply with weight
178+
weightedScorePerPod[pod] += score * float64(weightedScorer.Weight())
182179
}
183-
loggerDebug.Info("After running scorer", "scorer", scorer.Name())
180+
loggerDebug.Info("After running scorer", "scorer", weightedScorer.Name())
184181
}
185182
loggerDebug.Info("After running scorer plugins")
186183

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scheduling
18+
19+
import (
20+
"fmt"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
24+
)
25+
26+
// NewSchedulerConfig creates a new SchedulerConfig object and returns its pointer.
27+
func NewSchedulerConfig() *SchedulerConfig {
28+
return &SchedulerConfig{
29+
preSchedulePlugins: []plugins.PreSchedule{},
30+
filters: []plugins.Filter{},
31+
scorers: []*scorer.WeightedScorer{},
32+
postSchedulePlugins: []plugins.PostSchedule{},
33+
postResponsePlugins: []plugins.PostResponse{},
34+
// picker remains nil since config doesn't support multiple pickers
35+
}
36+
}
37+
38+
// SchedulerConfig provides a configuration for the scheduler which influence routing decisions.
39+
type SchedulerConfig struct {
40+
preSchedulePlugins []plugins.PreSchedule
41+
filters []plugins.Filter
42+
scorers []*scorer.WeightedScorer
43+
picker plugins.Picker
44+
postSchedulePlugins []plugins.PostSchedule
45+
postResponsePlugins []plugins.PostResponse
46+
}
47+
48+
// WithPreSchedulePlugins sets the given plugins as the PreSchedule plugins.
49+
// If the SchedulerConfig has PreSchedule plugins, this call replaces the existing plugins with the given ones.
50+
func (c *SchedulerConfig) WithPreSchedulePlugins(plugins ...plugins.PreSchedule) *SchedulerConfig {
51+
c.preSchedulePlugins = plugins
52+
return c
53+
}
54+
55+
// WithFilters sets the given filter plugins as the Filter plugins.
56+
// if the SchedulerConfig has Filter plugins, this call replaces the existing plugins with the given ones.
57+
func (c *SchedulerConfig) WithFilters(filters ...plugins.Filter) *SchedulerConfig {
58+
c.filters = filters
59+
return c
60+
}
61+
62+
// WithScorers sets the given scorer plugins as the Scorer plugins.
63+
// if the SchedulerConfig has Scorer plugins, this call replaces the existing plugins with the given ones.
64+
func (c *SchedulerConfig) WithScorers(scorers ...*scorer.WeightedScorer) *SchedulerConfig {
65+
c.scorers = scorers
66+
return c
67+
}
68+
69+
// WithPicker sets the given picker plugins as the Picker plugin.
70+
// if the SchedulerConfig has Picker plugin, this call replaces the existing plugin with the given one.
71+
func (c *SchedulerConfig) WithPicker(picker plugins.Picker) *SchedulerConfig {
72+
c.picker = picker
73+
return c
74+
}
75+
76+
// WithPostSchedulePlugins sets the given plugins as the PostSchedule plugins.
77+
// If the SchedulerConfig has PostSchedule plugins, this call replaces the existing plugins with the given ones.
78+
func (c *SchedulerConfig) WithPostSchedulePlugins(plugins ...plugins.PostSchedule) *SchedulerConfig {
79+
c.postSchedulePlugins = plugins
80+
return c
81+
}
82+
83+
// WithPostResponsePlugins sets the given plugins as the PostResponse plugins.
84+
// If the SchedulerConfig has PostResponse plugins, this call replaces the existing plugins with the given ones.
85+
func (c *SchedulerConfig) WithPostResponsePlugins(plugins ...plugins.PostResponse) *SchedulerConfig {
86+
c.postResponsePlugins = plugins
87+
return c
88+
}
89+
90+
// AddPlugins adds the given plugins to all scheduler plugins according to the interfaces each plugin implements.
91+
// A plugin may implement more than one scheduler plugin interface.
92+
// Special Case: In order to add a scorer, one must use the scorer.NewWeightedScorer function in order to provide a weight.
93+
// if a scorer implements more than one interface, supplying a WeightedScorer is sufficient. The function will take the internal
94+
// scorer object and register it to all interfaces it implements.
95+
func (c *SchedulerConfig) AddPlugins(pluginObjects ...plugins.Plugin) error {
96+
for _, plugin := range pluginObjects {
97+
if weightedScorer, ok := plugin.(*scorer.WeightedScorer); ok {
98+
c.scorers = append(c.scorers, weightedScorer)
99+
plugin = weightedScorer.Scorer // if we got WeightedScorer, unwrap the plugin
100+
} else if scorer, ok := plugin.(plugins.Scorer); ok { // if we got a Scorer instead of WeightedScorer that's an error.
101+
return fmt.Errorf("failed to register scorer '%s' without a weight. follow function documentation to register a scorer", scorer.Name())
102+
}
103+
if preSchedulePlugin, ok := plugin.(plugins.PreSchedule); ok {
104+
c.preSchedulePlugins = append(c.preSchedulePlugins, preSchedulePlugin)
105+
}
106+
if filter, ok := plugin.(plugins.Filter); ok {
107+
c.filters = append(c.filters, filter)
108+
}
109+
if picker, ok := plugin.(plugins.Picker); ok {
110+
if c.picker != nil {
111+
return fmt.Errorf("failed to set '%s' as picker, already have a registered picker plugin '%s'", picker.Name(), c.picker.Name())
112+
}
113+
c.picker = picker
114+
}
115+
if postSchedulePlugin, ok := plugin.(plugins.PostSchedule); ok {
116+
c.postSchedulePlugins = append(c.postSchedulePlugins, postSchedulePlugin)
117+
}
118+
if postResponsePlugin, ok := plugin.(plugins.PostResponse); ok {
119+
c.postResponsePlugins = append(c.postResponsePlugins, postResponsePlugin)
120+
}
121+
}
122+
return nil
123+
}

0 commit comments

Comments
 (0)