Skip to content

Commit 72fbea6

Browse files
committed
Refactor scheduler to run plugins
1 parent 1ba13f3 commit 72fbea6

File tree

12 files changed

+805
-245
lines changed

12 files changed

+805
-245
lines changed

pkg/epp/backend/metrics/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ func (p *Pod) String() string {
7979
}
8080

8181
func (p *Pod) Clone() *Pod {
82+
if p == nil {
83+
return nil
84+
}
8285
return &Pod{
8386
NamespacedName: types.NamespacedName{
8487
Name: p.NamespacedName.Name,
@@ -118,6 +121,9 @@ func (m *Metrics) String() string {
118121
}
119122

120123
func (m *Metrics) Clone() *Metrics {
124+
if m == nil {
125+
return nil
126+
}
121127
cm := make(map[string]int, len(m.ActiveModels))
122128
for k, v := range m.ActiveModels {
123129
cm[k] = v

pkg/epp/handlers/request.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (s *StreamingServer) HandleRequestBody(
6767
ResolvedTargetModel: modelName,
6868
Critical: modelObj.Spec.Criticality != nil && *modelObj.Spec.Criticality == v1alpha2.Critical,
6969
}
70-
logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "critical", llmReq.Critical)
70+
logger.V(logutil.DEBUG).Info("LLM request assembled", "request", llmReq)
7171

7272
var err error
7373
// Update target models in the body.
@@ -81,11 +81,11 @@ func (s *StreamingServer) HandleRequestBody(
8181
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
8282
}
8383

84-
target, err := s.scheduler.Schedule(ctx, llmReq)
84+
res, err := s.scheduler.Schedule(ctx, llmReq)
8585
if err != nil {
8686
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
8787
}
88-
targetPod := target.GetPod()
88+
targetPod := res.TargetPod.GetPod()
8989

9090
// Insert target endpoint to instruct Envoy to route requests to the specified target pod.
9191
// Attach the port number

pkg/epp/handlers/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type StreamingServer struct {
6565
}
6666

6767
type Scheduler interface {
68-
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (targetPod schedulingtypes.Pod, err error)
68+
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.Result, err error)
6969
}
7070

7171
// RequestContext stores context information during the life time of an HTTP request.

pkg/epp/scheduling/config/config.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package config
18+
19+
import (
20+
"sigs.k8s.io/controller-runtime/pkg/log"
21+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
22+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
23+
)
24+
25+
// Config holds all the configuration values for the scheduler
26+
type Config struct {
27+
KVCacheThreshold float64
28+
QueueThresholdCritical int
29+
QueueingThresholdLoRA int
30+
LoraAffinityThreshold float64
31+
}
32+
33+
const (
34+
// Default values to use if environment variables are not set
35+
defaultKVCacheThreshold = 0.8
36+
defaultQueueThresholdCritical = 5
37+
defaultQueueingThresholdLoRA = 128
38+
defaultLoraAffinityThreshold = 0.999
39+
)
40+
41+
// LoadConfig loads configuration from environment variables
42+
func LoadConfig() Config {
43+
// Use a default logger for initial configuration loading
44+
baseLogger := log.Log.WithName("scheduling-config")
45+
46+
config := Config{
47+
KVCacheThreshold: envutil.GetEnvFloat("KV_CACHE_THRESHOLD", defaultKVCacheThreshold, baseLogger),
48+
QueueThresholdCritical: envutil.GetEnvInt("QUEUE_THRESHOLD_CRITICAL", defaultQueueThresholdCritical, baseLogger),
49+
QueueingThresholdLoRA: envutil.GetEnvInt("QUEUING_THRESHOLD_LORA", defaultQueueingThresholdLoRA, baseLogger),
50+
LoraAffinityThreshold: envutil.GetEnvFloat("LORA_AFFINITY_THRESHOLD", defaultLoraAffinityThreshold, baseLogger),
51+
}
52+
53+
baseLogger.V(logutil.DEFAULT).Info("Scheduler configuration loaded", "config", config)
54+
55+
return config
56+
}
57+
58+
var Conf = LoadConfig()

0 commit comments

Comments
 (0)