-
Notifications
You must be signed in to change notification settings - Fork 89
Refactor scheduler to run plugins #677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,7 +67,7 @@ func (s *StreamingServer) HandleRequestBody( | |
ResolvedTargetModel: modelName, | ||
Critical: modelObj.Spec.Criticality != nil && *modelObj.Spec.Criticality == v1alpha2.Critical, | ||
} | ||
logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "critical", llmReq.Critical) | ||
logger.V(logutil.DEBUG).Info("LLM request assembled", "request", llmReq) | ||
|
||
var err error | ||
// Update target models in the body. | ||
|
@@ -81,11 +81,11 @@ func (s *StreamingServer) HandleRequestBody( | |
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)} | ||
} | ||
|
||
target, err := s.scheduler.Schedule(ctx, llmReq) | ||
res, err := s.scheduler.Schedule(ctx, llmReq) | ||
if err != nil { | ||
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} | ||
} | ||
targetPod := target.GetPod() | ||
targetPod := res.TargetPod.GetPod() | ||
|
||
// Insert target endpoint to instruct Envoy to route requests to the specified target pod. | ||
// Attach the port number | ||
|
@@ -96,8 +96,7 @@ func (s *StreamingServer) HandleRequestBody( | |
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber)) | ||
|
||
logger.V(logutil.DEFAULT).Info("Request handled", | ||
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod, "endpoint metrics", | ||
fmt.Sprintf("%+v", target)) | ||
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logging the metrics of the picked endpoint is useful here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern is that the metric list may grow, making this log very long. I am not sure how useful it is, because the decision is made upon multiple pods, you will need to compare this pod to other pods to determine whether the decision is good or not (if this is the intention of having this log) |
||
|
||
reqCtx.Model = llmReq.Model | ||
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ import ( | |
const ( | ||
InferenceModelComponent = "inference_model" | ||
InferencePoolComponent = "inference_pool" | ||
EPPComponent = "endpoint_picker" | ||
) | ||
|
||
var ( | ||
|
@@ -176,6 +177,20 @@ var ( | |
}, | ||
[]string{"name"}, | ||
) | ||
|
||
// Scheduler Plugin Metrics | ||
SchedulerPluginProcessingLatencies = compbasemetrics.NewHistogramVec( | ||
&compbasemetrics.HistogramOpts{ | ||
Subsystem: EPPComponent, | ||
Name: "scheduler_plugin_duration_seconds", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's great! |
||
Help: "Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.", | ||
Buckets: []float64{ | ||
0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, | ||
liu-cong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}, | ||
StabilityLevel: compbasemetrics.ALPHA, | ||
}, | ||
[]string{"plugin_type", "plugin_name"}, | ||
) | ||
) | ||
|
||
var registerMetrics sync.Once | ||
|
@@ -196,6 +211,8 @@ func Register() { | |
legacyregistry.MustRegister(inferencePoolAvgKVCache) | ||
legacyregistry.MustRegister(inferencePoolAvgQueueSize) | ||
legacyregistry.MustRegister(inferencePoolReadyPods) | ||
|
||
liu-cong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
legacyregistry.MustRegister(SchedulerPluginProcessingLatencies) | ||
}) | ||
} | ||
|
||
|
@@ -293,3 +310,8 @@ func RecordInferencePoolAvgQueueSize(name string, queueSize float64) { | |
func RecordinferencePoolReadyPods(name string, runningPods float64) { | ||
inferencePoolReadyPods.WithLabelValues(name).Set(runningPods) | ||
} | ||
|
||
// RecordSchedulerPluginProcessingLatency records the processing latency for a scheduler plugin. | ||
func RecordSchedulerPluginProcessingLatency(pluginType, pluginName string, duration time.Duration) { | ||
SchedulerPluginProcessingLatencies.WithLabelValues(pluginType, pluginName).Observe(duration.Seconds()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
# HELP endpoint_picker_scheduler_plugin_duration_seconds [ALPHA] Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name. | ||
# TYPE endpoint_picker_scheduler_plugin_duration_seconds histogram | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0001"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0002"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0005"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.001"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.002"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.005"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.01"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.02"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.05"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.1"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="+Inf"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginA",plugin_type="PreSchedule"} 0.1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginA",plugin_type="PreSchedule"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0001"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0002"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0005"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.001"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.002"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.005"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.01"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.02"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.05"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.1"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="+Inf"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginB",plugin_type="PostSchedule"} 0.2 | ||
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginB",plugin_type="PostSchedule"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0001"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0002"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0005"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.001"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.002"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.005"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.01"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.02"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.05"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.1"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="+Inf"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginC",plugin_type="Filter"} 0.05 | ||
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginC",plugin_type="Filter"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0001"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0002"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0005"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.001"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.002"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.005"} 0 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.01"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.02"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.05"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.1"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="+Inf"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginD",plugin_type="Scorer"} 0.01 | ||
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginD",plugin_type="Scorer"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0001"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0002"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0005"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.001"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.002"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.005"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.01"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.02"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.05"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.1"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="+Inf"} 1 | ||
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginE",plugin_type="Picker"} 1e-05 | ||
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginE",plugin_type="Picker"} 1 |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is moved from scheduler.go. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
Copyright 2025 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package config | ||
|
||
import ( | ||
"sigs.k8s.io/controller-runtime/pkg/log" | ||
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env" | ||
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" | ||
) | ||
|
||
// Config holds all the configuration values for the scheduler | ||
type Config struct { | ||
KVCacheThreshold float64 | ||
QueueThresholdCritical int | ||
QueueingThresholdLoRA int | ||
LoraAffinityThreshold float64 | ||
} | ||
|
||
const ( | ||
// Default values to use if environment variables are not set | ||
defaultKVCacheThreshold = 0.8 | ||
defaultQueueThresholdCritical = 5 | ||
defaultQueueingThresholdLoRA = 128 | ||
defaultLoraAffinityThreshold = 0.999 | ||
) | ||
|
||
// LoadConfig loads configuration from environment variables | ||
func LoadConfig() Config { | ||
// Use a default logger for initial configuration loading | ||
baseLogger := log.Log.WithName("scheduling-config") | ||
|
||
config := Config{ | ||
KVCacheThreshold: envutil.GetEnvFloat("KV_CACHE_THRESHOLD", defaultKVCacheThreshold, baseLogger), | ||
QueueThresholdCritical: envutil.GetEnvInt("QUEUE_THRESHOLD_CRITICAL", defaultQueueThresholdCritical, baseLogger), | ||
QueueingThresholdLoRA: envutil.GetEnvInt("QUEUING_THRESHOLD_LORA", defaultQueueingThresholdLoRA, baseLogger), | ||
LoraAffinityThreshold: envutil.GetEnvFloat("LORA_AFFINITY_THRESHOLD", defaultLoraAffinityThreshold, baseLogger), | ||
} | ||
|
||
baseLogger.V(logutil.DEFAULT).Info("Scheduler configuration loaded", "config", config) | ||
|
||
return config | ||
} | ||
|
||
var Conf = LoadConfig() |
Uh oh!
There was an error while loading. Please reload this page.