-
Notifications
You must be signed in to change notification settings - Fork 90
Weighted scorers #737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Weighted scorers #737
Changes from 4 commits
9bbd13e
4d7625c
c5a41ce
e9293e4
6e68563
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,13 +72,18 @@ func NewScheduler(datastore Datastore) *Scheduler { | |
} | ||
|
||
func NewSchedulerWithConfig(datastore Datastore, config *SchedulerConfig) *Scheduler { | ||
sumOfScorersWeights := 0 | ||
for _, weight := range config.scorers { | ||
sumOfScorersWeights += weight | ||
} | ||
scheduler := &Scheduler{ | ||
datastore: datastore, | ||
preSchedulePlugins: config.preSchedulePlugins, | ||
scorers: config.scorers, | ||
filters: config.filters, | ||
scorers: config.scorers, | ||
postSchedulePlugins: config.postSchedulePlugins, | ||
picker: config.picker, | ||
sumOfScorersWeights: sumOfScorersWeights, | ||
} | ||
|
||
return scheduler | ||
|
@@ -88,9 +93,10 @@ type Scheduler struct { | |
datastore Datastore | ||
preSchedulePlugins []plugins.PreSchedule | ||
filters []plugins.Filter | ||
scorers []plugins.Scorer | ||
scorers map[plugins.Scorer]int // map from scorer to its weight | ||
postSchedulePlugins []plugins.PostSchedule | ||
picker plugins.Picker | ||
sumOfScorersWeights int | ||
} | ||
|
||
type Datastore interface { | ||
|
@@ -106,21 +112,21 @@ func (s *Scheduler) Schedule(ctx context.Context, req *types.LLMRequest) (*types | |
// 1. Reduce concurrent access to the datastore. | ||
// 2. Ensure consistent data during the scheduling operation of a request. | ||
sCtx := types.NewSchedulingContext(ctx, req, types.ToSchedulerPodMetrics(s.datastore.PodGetAll())) | ||
loggerDebug.Info(fmt.Sprintf("Scheduling a request. Metrics: %+v", sCtx.PodsSnapshot)) | ||
loggerDebug.Info(fmt.Sprintf("Scheduling a request, Metrics: %+v", sCtx.PodsSnapshot)) | ||
|
||
s.runPreSchedulePlugins(sCtx) | ||
|
||
pods := s.runFilterPlugins(sCtx) | ||
if len(pods) == 0 { | ||
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: "failed to find a target pod"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont think its in scope of this PR, but we should think about changing this error code. there's no guarantee that resources are exhausted when there are no pods, just that the filter for the specific request came up with nothing. I worry that sends incorrect signals to the reader. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a regression from b24f948 I believe. Previously this error was returned by the sheddableRequestFilter which knows this is a resource exhausted error. I would argue to keep the error from the Filter and Scorers, even if we don't need them today. This is more future proof, and addresses issues like this. @nirrozenbaum What do you think about bringing the errors back? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as of today, we have in main the following filters: I'm in favor of being flexible, but I think we should be use-case driven and not try to be future proof, otherwise we may end up implementing things that are not used and/or not relevant. for example, we changed the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The DropRequest is a use case to return an error. How do we solve that without an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we started this discussion on #677. I'll put here my interpretation for this question: first point, we can argue if filter that its result is an empty list is an error. I claim that there was no error in the filter itself, as the filter worked correctly and filtered according to its definition. the result is that we cannot serve the request, but this is not an error with the filter (scheduler returns an error if second point, and I'll refer to the code as it was in #677 (since it has changed since then) - this error was used in two places in the code:
to summarize, IMO the DropRequest that was previously used is not a good example of an error usage, since this is not an error. it was used (I think incorrectly) to mark that no pods were left. for this check, we can easily check the length of the returned filter pods. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You have a lot of great points! If a filter results in 0 pods, this will effectively lead to an error for the scheduler. While it's debatable whether it's an error from the filter perspective, we need to communicate this to the scheduler. Currently the scheduler simply returns a ResourceExhausted error, which seems too broad. IMO the filter should communicate a reason why it filtered out all pods. How about adding another "filterReason" return value (perhaps an enum), if an error is too strong, so the scheduler can interpret the empty pods and return a proper error to consumers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I agree that ResourceExhausted is not the right error to use here and that should be changed. "filterReason" is another word for saying error :) if there is no error we won't use the reason. can you give an example of "filterReason" you can think of? |
||
} | ||
|
||
s.runScorerPlugins(sCtx, pods) | ||
// if we got here, there is at least one pod to score | ||
weightedScorePerPod := s.runScorerPlugins(sCtx, pods) | ||
|
||
before := time.Now() | ||
res := s.picker.Pick(sCtx, pods) | ||
res := s.picker.Pick(sCtx, weightedScorePerPod) | ||
metrics.RecordSchedulerPluginProcessingLatency(plugins.PickerPluginType, s.picker.Name(), time.Since(before)) | ||
loggerDebug.Info("After running picker plugins", "result", res) | ||
loggerDebug.Info("After running picker plugin", "result", res) | ||
|
||
s.runPostSchedulePlugins(sCtx, res) | ||
|
||
|
@@ -136,15 +142,6 @@ func (s *Scheduler) runPreSchedulePlugins(ctx *types.SchedulingContext) { | |
} | ||
} | ||
|
||
func (s *Scheduler) runPostSchedulePlugins(ctx *types.SchedulingContext, res *types.Result) { | ||
for _, plugin := range s.postSchedulePlugins { | ||
ctx.Logger.V(logutil.DEBUG).Info("Running post-schedule plugin", "plugin", plugin.Name()) | ||
before := time.Now() | ||
plugin.PostSchedule(ctx, res) | ||
metrics.RecordSchedulerPluginProcessingLatency(plugins.PostSchedulePluginType, plugin.Name(), time.Since(before)) | ||
} | ||
} | ||
|
||
func (s *Scheduler) runFilterPlugins(ctx *types.SchedulingContext) []types.Pod { | ||
loggerDebug := ctx.Logger.V(logutil.DEBUG) | ||
filteredPods := ctx.PodsSnapshot | ||
|
@@ -163,29 +160,37 @@ func (s *Scheduler) runFilterPlugins(ctx *types.SchedulingContext) []types.Pod { | |
return filteredPods | ||
} | ||
|
||
func (s *Scheduler) runScorerPlugins(ctx *types.SchedulingContext, pods []types.Pod) { | ||
func (s *Scheduler) runScorerPlugins(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 { | ||
loggerDebug := ctx.Logger.V(logutil.DEBUG) | ||
loggerDebug.Info("Before running score plugins", "pods", pods) | ||
loggerDebug.Info("Before running scorer plugins", "pods", pods) | ||
|
||
weightedScorePerPod := make(map[types.Pod]float64, len(pods)) | ||
for _, pod := range pods { | ||
score := s.runScorersForPod(ctx, pod) | ||
pod.SetScore(score) | ||
weightedScorePerPod[pod] = float64(0) // initialize weighted score per pod with 0 value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I don't think this is necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if no scorers are configured (for example like today in main), removing this line will cause errors. |
||
} | ||
// Iterate through each scorer in the chain and accumulate the weighted scores. | ||
for scorer, weight := range s.scorers { | ||
loggerDebug.Info("Running scorer", "scorer", scorer.Name()) | ||
before := time.Now() | ||
scores := scorer.Score(ctx, pods) | ||
metrics.RecordSchedulerPluginProcessingLatency(plugins.ScorerPluginType, scorer.Name(), time.Since(before)) | ||
for pod, score := range scores { // weight is relative to the sum of weights | ||
nirrozenbaum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
weightedScorePerPod[pod] += score * float64(weight) / float64(s.sumOfScorersWeights) // TODO normalize score before multiply with weight | ||
} | ||
loggerDebug.Info("After running scorer", "scorer", scorer.Name()) | ||
} | ||
loggerDebug.Info("After running score plugins", "pods", pods) | ||
loggerDebug.Info("After running scorer plugins", "pods", pods) | ||
|
||
return weightedScorePerPod | ||
} | ||
|
||
// Iterate through each scorer in the chain and accumulate the scores. | ||
func (s *Scheduler) runScorersForPod(ctx *types.SchedulingContext, pod types.Pod) float64 { | ||
logger := ctx.Logger.WithValues("pod", pod.GetPod().NamespacedName).V(logutil.DEBUG) | ||
score := float64(0) | ||
for _, scorer := range s.scorers { | ||
logger.Info("Running scorer", "scorer", scorer.Name()) | ||
func (s *Scheduler) runPostSchedulePlugins(ctx *types.SchedulingContext, res *types.Result) { | ||
for _, plugin := range s.postSchedulePlugins { | ||
ctx.Logger.V(logutil.DEBUG).Info("Running post-schedule plugin", "plugin", plugin.Name()) | ||
before := time.Now() | ||
oneScore := scorer.Score(ctx, pod) | ||
metrics.RecordSchedulerPluginProcessingLatency(plugins.ScorerPluginType, scorer.Name(), time.Since(before)) | ||
score += oneScore | ||
logger.Info("After scorer", "scorer", scorer.Name(), "score", oneScore, "total score", score) | ||
plugin.PostSchedule(ctx, res) | ||
metrics.RecordSchedulerPluginProcessingLatency(plugins.PostSchedulePluginType, plugin.Name(), time.Since(before)) | ||
} | ||
return score | ||
} | ||
|
||
type defaultPlugin struct { | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PAY ATTENTION.
when someone wants to change GIE default filters/scorers/picker/etc by forking the repo the flow is the following:
defaultConfig
variable.the idea behind this small file is for such users to be able to keep their fork synced without drifting from upstream.
so since they have only this single file change. they can sync the rest of the code to get bug fixes and such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not to manage two separate files:
one for SchedulerConfig definition which will not be changed in personal fork (since it could be changed in the upstream)
and another file which defines the config instance with all required filters/scorers/picker/...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if SchedulerConfig has changed in upstream (e.g., includes new fields) that means Scheduler itself changed in upstream (cause the scheduler uses the fields from the config).
in such a case it's not possible to keep the previous config in a personal fork and use the scheduler from upstream.