Skip to content

Commit f52caa2

Browse files
dmitripikusshmuelk
andauthored
Session affinity scorer (#117)
* 'session affinity scorer' partial implementation (without headers in response) * Fix in filling request headers * Encoded value of namespaced pod name is sent in response to client * Support of session affinity scorer configuration via environment variables, is added * Go file for session affinity scorer is renamed * Redundant 'sessions' field is removed * Redundant 'ScorerWithPostResponse' struct is removed * - SessionID is renamed to sessionToken - Map fetch is done instead of loop * Session token name is changed to 'x-session-token' * Minor fixes are made in README * Small fix after merge --------- Co-authored-by: Shmuel Kallner <[email protected]>
1 parent 09f7448 commit f52caa2

File tree

4 files changed

+114
-9
lines changed

4 files changed

+114
-9
lines changed

README.md

+6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ export ENABLE_LOAD_AWARE_SCORER=true
2323
export LOAD_AWARE_SCORER_WEIGHT=1.0
2424
```
2525

26+
To enable the SessionAwareScorer, the following environment variables must be configured:
27+
```
28+
export ENABLE_SESSION_AWARE_SCORER=true
29+
export SESSION_AWARE_SCORER_WEIGHT=1.0
30+
```
31+
2632
To enable Prefill/Decode (PD) processing, the following environment variable must be configured:
2733
```
2834
export PD_ENABLED=true

pkg/epp/handlers/request.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
159159
}
160160

161161
for _, header := range req.RequestHeaders.Headers.Headers {
162-
reqCtx.RequestHeaders[header.Key] = header.Value
162+
reqCtx.RequestHeaders[header.Key] = string(header.RawValue)
163163
}
164164

165165
return nil

pkg/epp/scheduling/local_config.go

+28-8
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@ import (
2727
)
2828

2929
const (
30-
kvCacheScorerEnablementEnvVar = "ENABLE_KVCACHE_AWARE_SCORER"
31-
loadAwareScorerEnablementEnvVar = "ENABLE_LOAD_AWARE_SCORER"
32-
prefixScorerEnablementEnvVar = "ENABLE_PREFIX_AWARE_SCORER"
33-
pdFilterEnablementEnvVar = "ENABLE_PD_FILTER"
34-
35-
kvCacheScorerWeightEnvVar = "KVCACHE_AWARE_SCORER_WEIGHT"
36-
loadAwareScorerWeightEnvVar = "LOAD_AWARE_SCORER_WEIGHT"
37-
prefixScorerWeightEnvVar = "PREFIX_AWARE_SCORER_WEIGHT"
30+
kvCacheScorerEnablementEnvVar = "ENABLE_KVCACHE_AWARE_SCORER"
31+
loadAwareScorerEnablementEnvVar = "ENABLE_LOAD_AWARE_SCORER"
32+
prefixScorerEnablementEnvVar = "ENABLE_PREFIX_AWARE_SCORER"
33+
sessionAwareScorerEnablementEnvVar = "ENABLE_SESSION_AWARE_SCORER"
34+
pdFilterEnablementEnvVar = "ENABLE_PD_FILTER"
35+
36+
kvCacheScorerWeightEnvVar = "KVCACHE_AWARE_SCORER_WEIGHT"
37+
loadAwareScorerWeightEnvVar = "LOAD_AWARE_SCORER_WEIGHT"
38+
prefixScorerWeightEnvVar = "PREFIX_AWARE_SCORER_WEIGHT"
39+
sessionAwareScorerWeightEnvVar = "SESSION_AWARE_SCORER_WEIGHT"
3840
)
3941

4042
func init() {
@@ -45,6 +47,7 @@ func setDefaultConfig() {
4547
// since the default config is a global variable, we add this function to minimize rebase conflicts.
4648
// this configuration is a temporary state, it should be better streamlined.
4749
setLoadAwareScorer()
50+
setSessionAwareScorer()
4851
setKVCacheAwareScorer()
4952
setPrefixScorer()
5053

@@ -65,6 +68,23 @@ func setLoadAwareScorer() {
6568
loggerDebug.Info("Initialized LoadAwareScorer", "weight", loadBasedScorerWeight)
6669
}
6770

71+
func setSessionAwareScorer() {
72+
ctx := context.Background()
73+
loggerDebug := log.FromContext(ctx).WithName("scheduler_config").V(logutil.DEBUG)
74+
75+
if envutil.GetEnvString(sessionAwareScorerEnablementEnvVar, "false", loggerDebug) != "true" {
76+
loggerDebug.Info("Skipping SessionAwareScorer creation as it is not enabled")
77+
return
78+
}
79+
80+
sessionBasedScorerWeight := envutil.GetEnvInt(sessionAwareScorerWeightEnvVar, 1, loggerDebug)
81+
sessionAffinity := scorer.NewSessionAffinity()
82+
83+
defaultConfig.scorers[sessionAffinity] = sessionBasedScorerWeight
84+
defaultConfig.postResponsePlugins = append(defaultConfig.postResponsePlugins, sessionAffinity)
85+
loggerDebug.Info("Initialized SessionAwareScorer", "weight", sessionBasedScorerWeight)
86+
}
87+
6888
func setKVCacheAwareScorer() {
6989
ctx := context.Background()
7090
loggerDebug := log.FromContext(ctx).WithName("scheduler_config").V(logutil.DEBUG)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package scorer
15+
16+
import (
17+
"encoding/base64"
18+
"time"
19+
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
21+
)
22+
23+
const (
24+
sessionKeepAliveTime = 60 * time.Minute // How long should an idle session be kept alive
25+
sessionKeepAliveCheckFrequency = 15 * time.Minute // How often to check for overly idle sessions
26+
sessionTokenHeader = "x-session-token" // name of the session header in request
27+
)
28+
29+
// sessionAffinity is a routing scorer that routes subsequent
30+
// requests in a session to the same pod as the first request in the
31+
// session was sent to, by giving that pod the specified weight and assigning
32+
// zero score to the rest of the targets
33+
type SessionAffinity struct {
34+
}
35+
36+
func NewSessionAffinity() *SessionAffinity {
37+
return &SessionAffinity{}
38+
}
39+
40+
func (s *SessionAffinity) Name() string {
41+
return "session affinity scorer"
42+
}
43+
44+
func (s *SessionAffinity) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 {
45+
scoredPods := make(map[types.Pod]float64)
46+
47+
reqHeaders := ctx.Req.Headers
48+
49+
var sessionToken = ""
50+
v, ok := reqHeaders[sessionTokenHeader]
51+
if ok {
52+
sessionToken = v
53+
}
54+
55+
podName := ""
56+
if sessionToken != "" {
57+
decodedBytes, err := base64.StdEncoding.DecodeString(sessionToken)
58+
if err != nil {
59+
ctx.Logger.Error(err, "Error decoding")
60+
} else {
61+
podName = string(decodedBytes)
62+
}
63+
}
64+
for _, pod := range pods {
65+
if podName == "" {
66+
scoredPods[pod] = 0.0
67+
} else {
68+
if pod.GetPod().NamespacedName.String() == podName {
69+
scoredPods[pod] = 1.0
70+
}
71+
}
72+
}
73+
74+
return scoredPods
75+
}
76+
77+
func (s *SessionAffinity) PostResponse(ctx *types.SchedulingContext, pod types.Pod) {
78+
ctx.MutatedHeaders[sessionTokenHeader] = base64.StdEncoding.EncodeToString([]byte(pod.GetPod().NamespacedName.String()))
79+
}

0 commit comments

Comments
 (0)