Skip to content

Commit 572b562

Browse files
liu-congrlakhtakia
authored andcommitted
Move scheduler initialization up to the main (#757)
1 parent e07c029 commit 572b562

File tree

4 files changed

+12
-2
lines changed

4 files changed

+12
-2
lines changed

cmd/epp/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4343
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
44+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4445
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4647
)
@@ -169,6 +170,7 @@ func run() error {
169170

170171
datastore := datastore.NewDatastore(ctx, pmf)
171172

173+
scheduler := scheduling.NewScheduler(datastore)
172174
serverRunner := &runserver.ExtProcServerRunner{
173175
GrpcPort: *grpcPort,
174176
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
@@ -178,6 +180,7 @@ func run() error {
178180
SecureServing: *secureServing,
179181
CertPath: *certPath,
180182
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
183+
Scheduler: scheduler,
181184
}
182185
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
183186
setupLog.Error(err, "Failed to setup ext-proc controllers")

pkg/epp/handlers/request.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ func (s *StreamingServer) HandleRequestBody(
4646
if !ok {
4747
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
4848
}
49+
prompt, ok := requestBodyMap["prompt"].(string)
50+
if !ok {
51+
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "prompt not found in request"}
52+
}
4953

5054
modelName := model
5155

@@ -66,6 +70,7 @@ func (s *StreamingServer) HandleRequestBody(
6670
Model: model,
6771
ResolvedTargetModel: modelName,
6872
Critical: modelObj.Spec.Criticality != nil && *modelObj.Spec.Criticality == v1alpha2.Critical,
73+
Prompt: prompt,
6974
}
7075
logger.V(logutil.DEBUG).Info("LLM request assembled", "request", llmReq)
7176

pkg/epp/server/runserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/controller"
3636
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3737
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
38-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
3938
)
4039

4140
// ExtProcServerRunner provides methods to manage an external process server.
@@ -49,6 +48,7 @@ type ExtProcServerRunner struct {
4948
CertPath string
5049
UseStreaming bool
5150
RefreshPrometheusMetricsInterval time.Duration
51+
Scheduler handlers.Scheduler
5252

5353
// This should only be used in tests. We won't need this once we don't inject metrics in the tests.
5454
// TODO:(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/432) Cleanup
@@ -137,7 +137,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
137137
} else {
138138
srv = grpc.NewServer()
139139
}
140-
extProcServer := handlers.NewStreamingServer(scheduling.NewScheduler(r.Datastore), r.DestinationEndpointHintMetadataNamespace, r.DestinationEndpointHintKey, r.Datastore)
140+
extProcServer := handlers.NewStreamingServer(r.Scheduler, r.DestinationEndpointHintMetadataNamespace, r.DestinationEndpointHintKey, r.Datastore)
141141
extProcPb.RegisterExternalProcessorServer(
142142
srv,
143143
extProcServer,

test/integration/epp/hermetic_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import (
6565
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
6666
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
6767
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
68+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
6869
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
6970
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
7071
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -1351,6 +1352,7 @@ func BeforeSuite() func() {
13511352
// Adjust from defaults
13521353
serverRunner.PoolNamespacedName = types.NamespacedName{Name: "vllm-llama3-8b-instruct-pool", Namespace: "default"}
13531354
serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf)
1355+
serverRunner.Scheduler = scheduling.NewScheduler(serverRunner.Datastore)
13541356
serverRunner.SecureServing = false
13551357

13561358
if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil {

0 commit comments

Comments
 (0)