Skip to content

Commit 84dcfe0

Browse files
nirrozenbaumliu-cong
authored andcommitted
remove the PreCycle plugin from scheduler (kubernetes-sigs#876)
* remove the PreCycle plugin from scheduler Signed-off-by: Nir Rozenbaum <[email protected]> * Apply suggestions from code review Co-authored-by: Cong Liu <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]> Co-authored-by: Cong Liu <[email protected]>
1 parent 7d858a8 commit 84dcfe0

File tree

7 files changed

+25
-110
lines changed

7 files changed

+25
-110
lines changed

pkg/epp/metrics/metrics_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -571,11 +571,6 @@ func TestSchedulerPluginProcessingLatencies(t *testing.T) {
571571
{
572572
name: "multiple plugins",
573573
latencies: []pluginLatency{
574-
{
575-
pluginType: "PreSchedule",
576-
pluginName: "PluginA",
577-
duration: 100 * time.Millisecond,
578-
},
579574
{
580575
pluginType: "PostSchedule",
581576
pluginName: "PluginB",

pkg/epp/metrics/testdata/scheduler_plugin_processing_latencies_metric

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,5 @@
11
# HELP inference_extension_scheduler_plugin_duration_seconds [ALPHA] Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.
22
# TYPE inference_extension_scheduler_plugin_duration_seconds histogram
3-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0001"} 0
4-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0002"} 0
5-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0005"} 0
6-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.001"} 0
7-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.002"} 0
8-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.005"} 0
9-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.01"} 0
10-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.02"} 0
11-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.05"} 0
12-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.1"} 1
13-
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="+Inf"} 1
14-
inference_extension_scheduler_plugin_duration_seconds_sum{plugin_name="PluginA",plugin_type="PreSchedule"} 0.1
15-
inference_extension_scheduler_plugin_duration_seconds_count{plugin_name="PluginA",plugin_type="PreSchedule"} 1
163
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0001"} 0
174
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0002"} 0
185
inference_extension_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0005"} 0

pkg/epp/scheduling/framework/plugins.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
const (
2424
ProfilePickerType = "ProfilePicker"
25-
PreCyclePluginType = "PreCycle"
2625
FilterPluginType = "Filter"
2726
ScorerPluginType = "Scorer"
2827
PickerPluginType = "Picker"
@@ -44,13 +43,6 @@ type ProfilePicker interface {
4443
Pick(request *types.LLMRequest, profiles map[string]*SchedulerProfile, executionResults map[string]*types.Result) map[string]*SchedulerProfile
4544
}
4645

47-
// PreCycle is called when the scheduler receives a new request and invokes a SchedulerProfile cycle.
48-
// It can be used for various initialization work.
49-
type PreCycle interface {
50-
Plugin
51-
PreCycle(ctx *types.SchedulingContext)
52-
}
53-
5446
// Filter defines the interface for filtering a list of pods based on context.
5547
type Filter interface {
5648
Plugin

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ func (s *schedulingContextState) Clone() types.StateData {
114114
}
115115

116116
// compile-time type assertion
117-
var _ framework.PreCycle = &Plugin{}
118117
var _ framework.Scorer = &Plugin{}
119118
var _ framework.PostCycle = &Plugin{}
120119

@@ -132,18 +131,6 @@ func (m *Plugin) Name() string {
132131
return "prefix-cache"
133132
}
134133

135-
// PreCycle initializes the prefix plugin state for the current scheduling cycle.
136-
func (m *Plugin) PreCycle(ctx *types.SchedulingContext) {
137-
hashes := hashPrompt(ctx, m.HashBlockSize, m.MaxPrefixBlocksToMatch)
138-
state := &schedulingContextState{
139-
PrefixHashes: hashes,
140-
PrefixCacheServers: m.matchLongestPrefix(ctx, hashes, DefaultNumServersToMatch),
141-
}
142-
143-
ctx.CycleState.Write(types.StateKey(m.Name()), state)
144-
ctx.Logger.V(logutil.TRACE).Info(fmt.Sprintf("PreCycle, cached servers: %+v", state.PrefixCacheServers), "hashes", state.PrefixHashes)
145-
}
146-
147134
// PostCycle records in the plugin cache the result of the scheduling selection.
148135
func (m *Plugin) PostCycle(ctx *types.SchedulingContext, res *types.Result) {
149136
targetPod := res.TargetPod.GetPod()
@@ -160,13 +147,20 @@ func (m *Plugin) PostCycle(ctx *types.SchedulingContext, res *types.Result) {
160147

161148
// Score returns the scoring result for the given list of pods based on context.
162149
func (m *Plugin) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
163-
scores := make(map[types.Pod]float64, len(pods))
164-
165-
state, err := m.getPrefixState(ctx.CycleState)
166-
if err != nil {
167-
ctx.Logger.Error(err, "failed to read prefix plugin cycle state")
168-
return scores
150+
// pre score step, hashing prompt and find longest prefix match.
151+
hashes := hashPrompt(ctx, m.HashBlockSize, m.MaxPrefixBlocksToMatch)
152+
numServers := DefaultNumServersToMatch
153+
if numServers > len(pods) {
154+
numServers = len(pods)
155+
}
156+
state := &schedulingContextState{
157+
PrefixHashes: hashes,
158+
PrefixCacheServers: m.matchLongestPrefix(ctx, hashes, numServers),
169159
}
160+
ctx.CycleState.Write(types.StateKey(m.Name()), state)
161+
ctx.Logger.V(logutil.TRACE).Info(fmt.Sprintf("cached servers: %+v", state.PrefixCacheServers), "hashes", state.PrefixHashes)
162+
// calculate the scores of pods
163+
scores := make(map[types.Pod]float64, len(pods))
170164

171165
total := len(state.PrefixHashes)
172166
podScoreFunc := func(pod types.Pod) float64 {
@@ -185,9 +179,6 @@ func (m *Plugin) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types
185179

186180
// matchLongestPrefix returns a map of servers and length of prefix that each server caches.
187181
func (m *Plugin) matchLongestPrefix(ctx *types.SchedulingContext, hashes []BlockHash, numServers int) map[ServerID]int {
188-
if numServers > len(ctx.PodsSnapshot) {
189-
numServers = len(ctx.PodsSnapshot)
190-
}
191182
res := make(map[ServerID]int)
192183
// Use a greedy strategy to search from the longest prefix.
193184
// NOTE: It's possible to further optimize this with a binary search.

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,14 @@ func TestPrefixPlugin(t *testing.T) {
2828
Prompt: "aaaaaa",
2929
}
3030
ctx := types.NewSchedulingContext(context.Background(), req1, nil, pods)
31-
plugin.PreCycle(ctx)
31+
scores := plugin.Score(ctx, pods)
3232
state, err := plugin.getPrefixState(ctx.CycleState)
3333
assert.NoError(t, err)
3434
t.Logf("Hashes %+v, cached servers: %+v", state.PrefixHashes, state.PrefixCacheServers)
3535
// Input size is 6, hash block size is 4, the last 2 characters are ignored.
3636
// Total hashes = 2 (the first one is for the model)
3737
assert.Equal(t, 2, len(state.PrefixHashes), "number of hashes is incorrect")
3838
assert.Equal(t, 0, len(state.PrefixCacheServers), "there shouldn't be any cached servers")
39-
40-
// Updated to use the new Score method signature
41-
scores := plugin.Score(ctx, pods)
4239
assert.Equal(t, float64(0), scores[pod1], "score for pod1")
4340
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
4441

@@ -52,17 +49,14 @@ func TestPrefixPlugin(t *testing.T) {
5249
Prompt: "bbbbbb",
5350
}
5451
ctx = types.NewSchedulingContext(context.Background(), req2, nil, pods)
55-
plugin.PreCycle(ctx)
52+
scores = plugin.Score(ctx, pods)
5653
state, err = plugin.getPrefixState(ctx.CycleState)
5754
assert.NoError(t, err)
5855
t.Logf("Hashes %+v, cached servers: %+v", state.PrefixHashes, state.PrefixCacheServers)
5956
// Input size is 6, hash block size is 4, the last 2 characters are ignored.
6057
// Total hashes = 2 (the first one is for the model)
6158
assert.Equal(t, 2, len(state.PrefixHashes), "number of hashes is incorrect")
6259
assert.Equal(t, 0, len(state.PrefixCacheServers), "there shouldn't be any cached servers")
63-
64-
// Updated to use the new Score method signature
65-
scores = plugin.Score(ctx, pods)
6660
assert.Equal(t, float64(0), scores[pod1], "score for pod1")
6761
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
6862

@@ -75,17 +69,14 @@ func TestPrefixPlugin(t *testing.T) {
7569
Prompt: "aaaabbbb",
7670
}
7771
ctx = types.NewSchedulingContext(context.Background(), req3, nil, pods)
78-
plugin.PreCycle(ctx)
72+
scores = plugin.Score(ctx, pods)
7973
state, err = plugin.getPrefixState(ctx.CycleState)
8074
assert.NoError(t, err)
8175
t.Logf("Hashes %+v, cached servers: %+v", state.PrefixHashes, state.PrefixCacheServers)
8276
// Input size is 8, hash block size is 4, so 2 hashes will be calculated.
8377
// Total hashes = 3 (the first one is for the model)
8478
assert.Equal(t, 3, len(state.PrefixHashes), "number of hashes is incorrect")
8579
assert.Equal(t, 1, len(state.PrefixCacheServers), "pod1 should have cached the aaaa prefix")
86-
87-
// Updated to use the new Score method signature
88-
scores = plugin.Score(ctx, pods)
8980
assert.Equal(t, float64(2)/float64(3), scores[pod1], "score should be 2/3 - the model and the first prefix block match")
9081
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
9182

@@ -97,17 +88,14 @@ func TestPrefixPlugin(t *testing.T) {
9788
Prompt: "aaaabbbb",
9889
}
9990
ctx = types.NewSchedulingContext(context.Background(), req4, nil, pods)
100-
plugin.PreCycle(ctx)
91+
scores = plugin.Score(ctx, pods)
10192
state, err = plugin.getPrefixState(ctx.CycleState)
10293
assert.NoError(t, err)
10394
t.Logf("Hashes %+v, cached servers: %+v", state.PrefixHashes, state.PrefixCacheServers)
10495
// Input size is 8, hash block size is 4, so 2 hashes will be calculated.
10596
// Total hashes = 3 (the first one is for the model)
10697
assert.Equal(t, 3, len(state.PrefixHashes), "number of hashes is incorrect")
10798
assert.Equal(t, 0, len(state.PrefixCacheServers), "pod1 should have cached the aaaa prefix")
108-
109-
// Updated to use the new Score method signature
110-
scores = plugin.Score(ctx, pods)
11199
assert.Equal(t, float64(0), scores[pod1], "score for pod1")
112100
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
113101

@@ -119,17 +107,14 @@ func TestPrefixPlugin(t *testing.T) {
119107
Prompt: "aaaabbbbcccc",
120108
}
121109
ctx = types.NewSchedulingContext(context.Background(), req5, nil, pods)
122-
plugin.PreCycle(ctx)
110+
scores = plugin.Score(ctx, pods)
123111
state, err = plugin.getPrefixState(ctx.CycleState)
124112
assert.NoError(t, err)
125113
t.Logf("Hashes %+v, cached servers: %+v", state.PrefixHashes, state.PrefixCacheServers)
126114
// Input size is 12, hash block size is 4, so 3 hashes will be calculated.
127115
// Total hashes = 4 (the first one is for the model)
128116
assert.Equal(t, 4, len(state.PrefixHashes), "number of hashes is incorrect")
129117
assert.Equal(t, 1, len(state.PrefixCacheServers), "pod1 should have cached the aaaa prefix")
130-
131-
// Updated to use the new Score method signature
132-
scores = plugin.Score(ctx, pods)
133118
assert.Equal(t, 0.75, scores[pod1], "score should be 0.75 - the model and the first 2 prefix blocks match")
134119
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
135120

pkg/epp/scheduling/framework/scheduler_profile.go

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
// NewSchedulerProfile creates a new SchedulerProfile object and returns its pointer.
3030
func NewSchedulerProfile() *SchedulerProfile {
3131
return &SchedulerProfile{
32-
preCyclePlugins: []PreCycle{},
3332
filters: []Filter{},
3433
scorers: []*WeightedScorer{},
3534
postCyclePlugins: []PostCycle{},
@@ -40,21 +39,13 @@ func NewSchedulerProfile() *SchedulerProfile {
4039

4140
// SchedulerProfile provides a profile configuration for the scheduler which influence routing decisions.
4241
type SchedulerProfile struct {
43-
preCyclePlugins []PreCycle
4442
filters []Filter
4543
scorers []*WeightedScorer
4644
picker Picker
4745
postCyclePlugins []PostCycle
4846
PostResponsePlugins []PostResponse // TODO this field should get out of the scheduler
4947
}
5048

51-
// WithPreCyclePlugins sets the given plugins as the PreCycle plugins.
52-
// If the SchedulerProfile has PreCycle plugins, this call replaces the existing plugins with the given ones.
53-
func (p *SchedulerProfile) WithPreCyclePlugins(plugins ...PreCycle) *SchedulerProfile {
54-
p.preCyclePlugins = plugins
55-
return p
56-
}
57-
5849
// WithFilters sets the given filter plugins as the Filter plugins.
5950
// if the SchedulerProfile has Filter plugins, this call replaces the existing plugins with the given ones.
6051
func (p *SchedulerProfile) WithFilters(filters ...Filter) *SchedulerProfile {
@@ -96,9 +87,6 @@ func (p *SchedulerProfile) AddPlugins(pluginObjects ...Plugin) error {
9687
} else if scorer, ok := plugin.(Scorer); ok { // if we got a Scorer instead of WeightedScorer that's an error.
9788
return fmt.Errorf("failed to register scorer '%s' without a weight. follow function documentation to register a scorer", scorer.Name())
9889
}
99-
if preCyclePlugin, ok := plugin.(PreCycle); ok {
100-
p.preCyclePlugins = append(p.preCyclePlugins, preCyclePlugin)
101-
}
10290
if filter, ok := plugin.(Filter); ok {
10391
p.filters = append(p.filters, filter)
10492
}
@@ -119,10 +107,8 @@ func (p *SchedulerProfile) AddPlugins(pluginObjects ...Plugin) error {
119107
}
120108

121109
// RunCycle runs a SchedulerProfile cycle. In other words, it invokes all the SchedulerProfile plugins in this
122-
// order - PreCyclePlugins, Filters, Scorers, Picker, PostCyclePlugins. After completing all, it returns the result.
110+
// order - Filters, Scorers, Picker, PostCyclePlugins. After completing all, it returns the result.
123111
func (p *SchedulerProfile) RunCycle(ctx *types.SchedulingContext) (*types.Result, error) {
124-
p.runPreCyclePlugins(ctx)
125-
126112
pods := p.runFilterPlugins(ctx)
127113
if len(pods) == 0 {
128114
return nil, errutil.Error{Code: errutil.Internal, Msg: "no pods available for the given request"}
@@ -137,15 +123,6 @@ func (p *SchedulerProfile) RunCycle(ctx *types.SchedulingContext) (*types.Result
137123
return result, nil
138124
}
139125

140-
func (p *SchedulerProfile) runPreCyclePlugins(ctx *types.SchedulingContext) {
141-
for _, plugin := range p.preCyclePlugins {
142-
ctx.Logger.V(logutil.DEBUG).Info("Running pre-cycle plugin", "plugin", plugin.Name())
143-
before := time.Now()
144-
plugin.PreCycle(ctx)
145-
metrics.RecordSchedulerPluginProcessingLatency(PreCyclePluginType, plugin.Name(), time.Since(before))
146-
}
147-
}
148-
149126
func (p *SchedulerProfile) runFilterPlugins(ctx *types.SchedulingContext) []types.Pod {
150127
loggerDebug := ctx.Logger.V(logutil.DEBUG)
151128
filteredPods := ctx.PodsSnapshot

pkg/epp/scheduling/framework/scheduler_profile_test.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ func TestSchedulePlugins(t *testing.T) {
6161
{
6262
name: "all plugins executed successfully, all scorers with same weight",
6363
profile: NewSchedulerProfile().
64-
WithPreCyclePlugins(tp1, tp2).
6564
WithFilters(tp1, tp2).
6665
WithScorers(NewWeightedScorer(tp1, 1), NewWeightedScorer(tp2, 1)).
6766
WithPicker(pickerPlugin).
@@ -79,7 +78,6 @@ func TestSchedulePlugins(t *testing.T) {
7978
{
8079
name: "all plugins executed successfully, different scorers weights",
8180
profile: NewSchedulerProfile().
82-
WithPreCyclePlugins(tp1, tp2).
8381
WithFilters(tp1, tp2).
8482
WithScorers(NewWeightedScorer(tp1, 60), NewWeightedScorer(tp2, 40)).
8583
WithPicker(pickerPlugin).
@@ -97,7 +95,6 @@ func TestSchedulePlugins(t *testing.T) {
9795
{
9896
name: "filter all",
9997
profile: NewSchedulerProfile().
100-
WithPreCyclePlugins(tp1, tp2).
10198
WithFilters(tp1, tp_filterAll).
10299
WithScorers(NewWeightedScorer(tp1, 1), NewWeightedScorer(tp2, 1)).
103100
WithPicker(pickerPlugin).
@@ -115,9 +112,6 @@ func TestSchedulePlugins(t *testing.T) {
115112
for _, test := range tests {
116113
t.Run(test.name, func(t *testing.T) {
117114
// Reset all plugins before each new test case.
118-
for _, plugin := range test.profile.preCyclePlugins {
119-
plugin.(*testPlugin).reset()
120-
}
121115
for _, plugin := range test.profile.filters {
122116
plugin.(*testPlugin).reset()
123117
}
@@ -159,12 +153,6 @@ func TestSchedulePlugins(t *testing.T) {
159153
t.Errorf("Unexpected output (-want +got): %v", diff)
160154
}
161155
// Validate plugin execution counts dynamically
162-
for _, plugin := range test.profile.preCyclePlugins {
163-
tp, _ := plugin.(*testPlugin)
164-
if tp.PreScheduleCallCount != 1 {
165-
t.Errorf("Plugin %s PreSchedule() called %d times, expected 1", plugin.Name(), tp.PreScheduleCallCount)
166-
}
167-
}
168156
for _, plugin := range test.profile.filters {
169157
tp, _ := plugin.(*testPlugin)
170158
if tp.FilterCallCount != 1 {
@@ -200,6 +188,12 @@ func TestSchedulePlugins(t *testing.T) {
200188
}
201189
}
202190

191+
// compile-time type assertion
192+
var _ Filter = &testPlugin{}
193+
var _ Scorer = &testPlugin{}
194+
var _ Picker = &testPlugin{}
195+
var _ PostCycle = &testPlugin{}
196+
203197
// testPlugin is an implementation useful in unit tests.
204198
type testPlugin struct {
205199
NameRes string
@@ -208,7 +202,6 @@ type testPlugin struct {
208202
ScoreRes float64
209203
FilterCallCount int
210204
FilterRes []k8stypes.NamespacedName
211-
PreScheduleCallCount int
212205
PostScheduleCallCount int
213206
PickCallCount int
214207
NumOfPickerCandidates int
@@ -218,10 +211,6 @@ type testPlugin struct {
218211

219212
func (tp *testPlugin) Name() string { return tp.NameRes }
220213

221-
func (tp *testPlugin) PreCycle(ctx *types.SchedulingContext) {
222-
tp.PreScheduleCallCount++
223-
}
224-
225214
func (tp *testPlugin) Filter(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
226215
tp.FilterCallCount++
227216
return findPods(ctx, tp.FilterRes...)
@@ -251,7 +240,6 @@ func (tp *testPlugin) PostCycle(ctx *types.SchedulingContext, res *types.Result)
251240
}
252241

253242
func (tp *testPlugin) reset() {
254-
tp.PreScheduleCallCount = 0
255243
tp.FilterCallCount = 0
256244
tp.ScoreCallCount = 0
257245
tp.NumOfScoredPods = 0

0 commit comments

Comments
 (0)