Skip to content

Commit 6e68563

Browse files
committed
addressed code review comments
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent e9293e4 commit 6e68563

File tree

6 files changed

+77
-62
lines changed

6 files changed

+77
-62
lines changed

pkg/epp/scheduling/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ type SchedulerConfig struct {
2222
preSchedulePlugins []plugins.PreSchedule
2323
filters []plugins.Filter
2424
scorers map[plugins.Scorer]int // map from scorer to weight
25-
postSchedulePlugins []plugins.PostSchedule
2625
picker plugins.Picker
26+
postSchedulePlugins []plugins.PostSchedule
2727
}
2828

2929
var defPlugin = &defaultPlugin{}
@@ -36,6 +36,6 @@ var defaultConfig = &SchedulerConfig{
3636
preSchedulePlugins: []plugins.PreSchedule{},
3737
filters: []plugins.Filter{defPlugin},
3838
scorers: map[plugins.Scorer]int{},
39-
postSchedulePlugins: []plugins.PostSchedule{},
4039
picker: defPlugin,
40+
postSchedulePlugins: []plugins.PostSchedule{},
4141
}

pkg/epp/scheduling/plugins/picker/random_picker.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,8 @@ func (rp *RandomPicker) Name() string {
3434
return "random"
3535
}
3636

37-
func (rp *RandomPicker) Pick(ctx *types.SchedulingContext, scoredPods map[types.Pod]float64) *types.Result {
38-
ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates", len(scoredPods)))
39-
selectedIndex := rand.Intn(len(scoredPods))
40-
i := 0
41-
var randomPod types.Pod
42-
for pod := range scoredPods {
43-
if selectedIndex == i {
44-
randomPod = pod
45-
break
46-
47-
}
48-
i++
49-
}
50-
return &types.Result{TargetPod: randomPod}
37+
func (rp *RandomPicker) Pick(ctx *types.SchedulingContext, scoredPods []*types.ScoredPod) *types.Result {
38+
ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(scoredPods), scoredPods))
39+
i := rand.Intn(len(scoredPods))
40+
return &types.Result{TargetPod: scoredPods[i].Pod}
5141
}

pkg/epp/scheduling/plugins/plugins.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,16 @@ type Scorer interface {
5656
Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64
5757
}
5858

59-
// PostSchedule is called by the scheduler after it selects a targetPod for the request.
60-
type PostSchedule interface {
59+
// Picker picks the final pod(s) to send the request to.
60+
type Picker interface {
6161
Plugin
62-
PostSchedule(ctx *types.SchedulingContext, res *types.Result)
62+
Pick(ctx *types.SchedulingContext, scoredPods []*types.ScoredPod) *types.Result
6363
}
6464

65-
// Picker picks the final pod(s) to send the request to.
66-
type Picker interface {
65+
// PostSchedule is called by the scheduler after it selects a targetPod for the request.
66+
type PostSchedule interface {
6767
Plugin
68-
Pick(ctx *types.SchedulingContext, scoredPods map[types.Pod]float64) *types.Result
68+
PostSchedule(ctx *types.SchedulingContext, res *types.Result)
6969
}
7070

7171
// PostResponse is called by the scheduler after a successful response was sent.

pkg/epp/scheduling/scheduler.go

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -72,31 +72,23 @@ func NewScheduler(datastore Datastore) *Scheduler {
7272
}
7373

7474
func NewSchedulerWithConfig(datastore Datastore, config *SchedulerConfig) *Scheduler {
75-
sumOfScorersWeights := 0
76-
for _, weight := range config.scorers {
77-
sumOfScorersWeights += weight
78-
}
79-
scheduler := &Scheduler{
75+
return &Scheduler{
8076
datastore: datastore,
8177
preSchedulePlugins: config.preSchedulePlugins,
8278
filters: config.filters,
8379
scorers: config.scorers,
84-
postSchedulePlugins: config.postSchedulePlugins,
8580
picker: config.picker,
86-
sumOfScorersWeights: sumOfScorersWeights,
81+
postSchedulePlugins: config.postSchedulePlugins,
8782
}
88-
89-
return scheduler
9083
}
9184

9285
type Scheduler struct {
9386
datastore Datastore
9487
preSchedulePlugins []plugins.PreSchedule
9588
filters []plugins.Filter
9689
scorers map[plugins.Scorer]int // map from scorer to its weight
97-
postSchedulePlugins []plugins.PostSchedule
9890
picker plugins.Picker
99-
sumOfScorersWeights int
91+
postSchedulePlugins []plugins.PostSchedule
10092
}
10193

10294
type Datastore interface {
@@ -123,14 +115,11 @@ func (s *Scheduler) Schedule(ctx context.Context, req *types.LLMRequest) (*types
123115
// if we got here, there is at least one pod to score
124116
weightedScorePerPod := s.runScorerPlugins(sCtx, pods)
125117

126-
before := time.Now()
127-
res := s.picker.Pick(sCtx, weightedScorePerPod)
128-
metrics.RecordSchedulerPluginProcessingLatency(plugins.PickerPluginType, s.picker.Name(), time.Since(before))
129-
loggerDebug.Info("After running picker plugin", "result", res)
118+
result := s.runPickerPlugin(sCtx, weightedScorePerPod)
130119

131-
s.runPostSchedulePlugins(sCtx, res)
120+
s.runPostSchedulePlugins(sCtx, result)
132121

133-
return res, nil
122+
return result, nil
134123
}
135124

136125
func (s *Scheduler) runPreSchedulePlugins(ctx *types.SchedulingContext) {
@@ -157,6 +146,8 @@ func (s *Scheduler) runFilterPlugins(ctx *types.SchedulingContext) []types.Pod {
157146
break
158147
}
159148
}
149+
loggerDebug.Info("After running filter plugins")
150+
160151
return filteredPods
161152
}
162153

@@ -175,15 +166,33 @@ func (s *Scheduler) runScorerPlugins(ctx *types.SchedulingContext, pods []types.
175166
scores := scorer.Score(ctx, pods)
176167
metrics.RecordSchedulerPluginProcessingLatency(plugins.ScorerPluginType, scorer.Name(), time.Since(before))
177168
for pod, score := range scores { // weight is relative to the sum of weights
178-
weightedScorePerPod[pod] += score * float64(weight) / float64(s.sumOfScorersWeights) // TODO normalize score before multiply with weight
169+
weightedScorePerPod[pod] += score * float64(weight) // TODO normalize score before multiply with weight
179170
}
180171
loggerDebug.Info("After running scorer", "scorer", scorer.Name())
181172
}
182-
loggerDebug.Info("After running scorer plugins", "pods", pods)
173+
loggerDebug.Info("After running scorer plugins")
183174

184175
return weightedScorePerPod
185176
}
186177

178+
func (s *Scheduler) runPickerPlugin(ctx *types.SchedulingContext, weightedScorePerPod map[types.Pod]float64) *types.Result {
179+
loggerDebug := ctx.Logger.V(logutil.DEBUG)
180+
scoredPods := make([]*types.ScoredPod, len(weightedScorePerPod))
181+
i := 0
182+
for pod, score := range weightedScorePerPod {
183+
scoredPods[i] = &types.ScoredPod{Pod: pod, Score: score}
184+
i++
185+
}
186+
187+
loggerDebug.Info("Before running picker plugin", "pods", weightedScorePerPod)
188+
before := time.Now()
189+
result := s.picker.Pick(ctx, scoredPods)
190+
metrics.RecordSchedulerPluginProcessingLatency(plugins.PickerPluginType, s.picker.Name(), time.Since(before))
191+
loggerDebug.Info("After running picker plugin", "result", result)
192+
193+
return result
194+
}
195+
187196
func (s *Scheduler) runPostSchedulePlugins(ctx *types.SchedulingContext, res *types.Result) {
188197
for _, plugin := range s.postSchedulePlugins {
189198
ctx.Logger.V(logutil.DEBUG).Info("Running post-schedule plugin", "plugin", plugin.Name())

pkg/epp/scheduling/scheduler_test.go

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -274,16 +274,16 @@ func TestSchedulePlugins(t *testing.T) {
274274
tp1: 1,
275275
tp2: 1,
276276
},
277-
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
278277
picker: pickerPlugin,
278+
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
279279
},
280280
input: []*backendmetrics.FakePodMetrics{
281281
{Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
282282
{Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
283283
{Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
284284
},
285285
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
286-
targetPodScore: 0.55,
286+
targetPodScore: 1.1,
287287
numPodsToScore: 2,
288288
err: false,
289289
},
@@ -296,16 +296,16 @@ func TestSchedulePlugins(t *testing.T) {
296296
tp1: 60,
297297
tp2: 40,
298298
},
299-
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
300299
picker: pickerPlugin,
300+
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
301301
},
302302
input: []*backendmetrics.FakePodMetrics{
303303
{Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
304304
{Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
305305
{Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
306306
},
307307
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
308-
targetPodScore: 0.5,
308+
targetPodScore: 50,
309309
numPodsToScore: 2,
310310
err: false,
311311
},
@@ -318,8 +318,8 @@ func TestSchedulePlugins(t *testing.T) {
318318
tp1: 1,
319319
tp2: 1,
320320
},
321-
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
322321
picker: pickerPlugin,
322+
postSchedulePlugins: []plugins.PostSchedule{tp1, tp2},
323323
},
324324
input: []*backendmetrics.FakePodMetrics{
325325
{Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
@@ -337,16 +337,16 @@ func TestSchedulePlugins(t *testing.T) {
337337
for _, plugin := range test.config.preSchedulePlugins {
338338
plugin.(*TestPlugin).reset()
339339
}
340-
for _, plugin := range test.config.postSchedulePlugins {
341-
plugin.(*TestPlugin).reset()
342-
}
343340
for _, plugin := range test.config.filters {
344341
plugin.(*TestPlugin).reset()
345342
}
346343
for plugin := range test.config.scorers {
347344
plugin.(*TestPlugin).reset()
348345
}
349346
test.config.picker.(*TestPlugin).reset()
347+
for _, plugin := range test.config.postSchedulePlugins {
348+
plugin.(*TestPlugin).reset()
349+
}
350350

351351
// Initialize the scheduler
352352
scheduler := NewSchedulerWithConfig(&fakeDataStore{pods: test.input}, &test.config)
@@ -397,13 +397,6 @@ func TestSchedulePlugins(t *testing.T) {
397397
}
398398
}
399399

400-
for _, plugin := range test.config.postSchedulePlugins {
401-
tp, _ := plugin.(*TestPlugin)
402-
if tp.PostScheduleCallCount != 1 {
403-
t.Errorf("Plugin %s PostSchedule() called %d times, expected 1", plugin.Name(), tp.PostScheduleCallCount)
404-
}
405-
}
406-
407400
tp, _ := test.config.picker.(*TestPlugin)
408401
if tp.NumOfPickerCandidates != test.numPodsToScore {
409402
t.Errorf("Picker plugin %s Pick() called with %d candidates, expected %d", tp.Name(), tp.NumOfPickerCandidates, tp.NumOfScoredPods)
@@ -414,6 +407,13 @@ func TestSchedulePlugins(t *testing.T) {
414407
if tp.WinnderPodScore != test.targetPodScore {
415408
t.Errorf("winnder pod score %v, expected %v", tp.WinnderPodScore, test.targetPodScore)
416409
}
410+
411+
for _, plugin := range test.config.postSchedulePlugins {
412+
tp, _ := plugin.(*TestPlugin)
413+
if tp.PostScheduleCallCount != 1 {
414+
t.Errorf("Plugin %s PostSchedule() called %d times, expected 1", plugin.Name(), tp.PostScheduleCallCount)
415+
}
416+
}
417417
})
418418
}
419419
}
@@ -468,18 +468,18 @@ func (tp *TestPlugin) Score(ctx *types.SchedulingContext, pods []types.Pod) map[
468468
return scoredPods
469469
}
470470

471-
func (tp *TestPlugin) PostSchedule(ctx *types.SchedulingContext, res *types.Result) {
472-
tp.PostScheduleCallCount++
473-
}
474-
475-
func (tp *TestPlugin) Pick(ctx *types.SchedulingContext, scoredPods map[types.Pod]float64) *types.Result {
471+
func (tp *TestPlugin) Pick(ctx *types.SchedulingContext, scoredPods []*types.ScoredPod) *types.Result {
476472
tp.PickCallCount++
477473
tp.NumOfPickerCandidates = len(scoredPods)
478474
pod := findPods(ctx, tp.PickRes)[0]
479-
tp.WinnderPodScore = scoredPods[pod]
475+
tp.WinnderPodScore = getPodScore(scoredPods, pod)
480476
return &types.Result{TargetPod: pod}
481477
}
482478

479+
func (tp *TestPlugin) PostSchedule(ctx *types.SchedulingContext, res *types.Result) {
480+
tp.PostScheduleCallCount++
481+
}
482+
483483
func (tp *TestPlugin) reset() {
484484
tp.PreScheduleCallCount = 0
485485
tp.FilterCallCount = 0
@@ -501,3 +501,14 @@ func findPods(ctx *types.SchedulingContext, names ...k8stypes.NamespacedName) []
501501
}
502502
return res
503503
}
504+
505+
func getPodScore(scoredPods []*types.ScoredPod, selectedPod types.Pod) float64 {
506+
finalScore := 0.0
507+
for _, scoredPod := range scoredPods {
508+
if scoredPod.Pod.GetPod().NamespacedName.String() == selectedPod.GetPod().NamespacedName.String() {
509+
finalScore = scoredPod.Score
510+
break
511+
}
512+
}
513+
return finalScore
514+
}

pkg/epp/scheduling/types/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ type Pod interface {
4646
String() string
4747
}
4848

49+
type ScoredPod struct {
50+
Pod Pod
51+
Score float64
52+
}
53+
4954
// SchedulingContext holds contextual information during a scheduling operation.
5055
type SchedulingContext struct {
5156
context.Context

0 commit comments

Comments
 (0)