Skip to content

[UPSTREAM-SYNC] Add the LLMResponse struct to hold response information #135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/epp/scheduling/plugins/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestFilter(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := types.NewSchedulingContext(context.Background(), test.req, test.input)
ctx := types.NewSchedulingContext(context.Background(), test.req, nil, test.input)
got := test.filter.Filter(ctx, test.input)

if diff := cmp.Diff(test.output, got); diff != "" {
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestFilterFunc(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := types.NewSchedulingContext(context.Background(), test.req, test.input)
ctx := types.NewSchedulingContext(context.Background(), test.req, nil, test.input)
got := test.f(ctx, test.input)

if diff := cmp.Diff(test.output, got); diff != "" {
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
},
},
}
ctx := types.NewSchedulingContext(context.Background(), req, pods)
ctx := types.NewSchedulingContext(context.Background(), req, nil, pods)

// Run the filter function multiple times and count the results
affinityCount := 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/scheduling/plugins/scorer/kvcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestKvCacheScorer(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := types.NewSchedulingContext(context.Background(), &types.LLMRequest{}, tt.pods)
ctx := types.NewSchedulingContext(context.Background(), &types.LLMRequest{}, nil, tt.pods)
scorer := &KVCacheScorer{}
scores := scorer.Score(ctx, tt.pods)

Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/scheduling/plugins/scorer/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestQueueScorer(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := types.NewSchedulingContext(context.Background(), &types.LLMRequest{}, tt.pods)
ctx := types.NewSchedulingContext(context.Background(), &types.LLMRequest{}, nil, tt.pods)
scores := scorer.Score(ctx, tt.pods)

for i, pod := range tt.pods {
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *Scheduler) Schedule(ctx context.Context, req *types.LLMRequest) (*types
// Snapshot pod metrics from the datastore to:
// 1. Reduce concurrent access to the datastore.
// 2. Ensure consistent data during the scheduling operation of a request.
sCtx := types.NewSchedulingContext(ctx, req, types.ToSchedulerPodMetrics(s.datastore.PodGetAll()))
sCtx := types.NewSchedulingContext(ctx, req, nil, types.ToSchedulerPodMetrics(s.datastore.PodGetAll()))
loggerDebug.Info(fmt.Sprintf("Scheduling a request, Metrics: %+v", sCtx.PodsSnapshot))

s.runPreSchedulePlugins(sCtx)
Expand Down
19 changes: 18 additions & 1 deletion pkg/epp/scheduling/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ func (r *LLMRequest) String() string {
r.Model, r.ResolvedTargetModel, r.Critical, len(r.Prompt), r.Headers)
}

// LLMResponse contains information from the response received to be passed to plugins
type LLMResponse struct {
// Headers is a map of the response headers. Nil during body processing
Headers map[string]string

// Body Is the body of the response or nil during header processing
Body string

// IsStreaming indicates whether or not the response is being streamed by the model
IsSreaming bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
IsSreaming bool
IsStreaming bool


// EndOfStream when true indicates that this invocation contains the last chunk of the response
EndOfStream bool
}

type Pod interface {
GetPod() *backend.Pod
GetMetrics() *backendmetrics.Metrics
Expand All @@ -61,6 +76,7 @@ type SchedulingContext struct {
context.Context
Logger logr.Logger
Req *LLMRequest
Resp *LLMResponse
PodsSnapshot []Pod
MutatedHeaders map[string]string
}
Expand All @@ -85,12 +101,13 @@ type PodMetrics struct {
*backendmetrics.Metrics
}

func NewSchedulingContext(ctx context.Context, req *LLMRequest, pods []Pod) *SchedulingContext {
func NewSchedulingContext(ctx context.Context, req *LLMRequest, resp *LLMResponse, pods []Pod) *SchedulingContext {
logger := log.FromContext(ctx).WithValues("request", req)
return &SchedulingContext{
Context: ctx,
Logger: logger,
Req: req,
Resp: resp,
PodsSnapshot: pods,
MutatedHeaders: make(map[string]string),
}
Expand Down