forked from kubernetes-sigs/gateway-api-inference-extension
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilter.go
155 lines (135 loc) · 5.25 KB
/
filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package scheduling
import (
"fmt"
"math"
klog "k8s.io/klog/v2"
"ext-proc/backend"
)
type Filter interface {
Name() string
Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)
}
// filter applies current filterFunc, and then recursively applies next filters depending success or
// failure of the current filterFunc.
// It can be used to construct a flow chart algorithm.
type filter struct {
name string
filter filterFunc
// nextOnSuccess filter will be applied after successfully applying the current filter.
// The filtered results will be passed to the next filter.
nextOnSuccess *filter
// nextOnFailure filter will be applied if current filter fails.
// The original input will be passed to the next filter.
nextOnFailure *filter
// nextOnSuccessOrFailure is a convenience field to configure the next filter regardless of the
// success or failure of the current filter.
// NOTE: When using nextOnSuccessOrFailure, both nextOnSuccess and nextOnFailure SHOULD be nil.
// However if that's not the case, nextOnSuccess and nextOnFailure will be used, instead of
// nextOnSuccessOrFailure, in the success and failure scenarios, respectively.
nextOnSuccessOrFailure *filter
}
func (f *filter) Name() string {
if f == nil {
return "nil"
}
return f.name
}
func (f *filter) Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
if f == nil {
klog.V(3).Infof("Running nil filter, returning all input pods by default")
return pods, nil
}
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods))
filtered, err := f.filter(b, pods)
next := f.nextOnSuccessOrFailure
if err == nil {
klog.V(3).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered))
if f.nextOnSuccess != nil {
next = f.nextOnSuccess
}
// On success, pass the filtered result to the next filter.
return next.Filter(b, filtered)
}
klog.V(3).Infof("onFailure %v -> %v", f.name, next.Name())
if f.nextOnFailure != nil {
next = f.nextOnFailure
}
// On failure, pass the initial set of pods to the next filter.
return next.Filter(b, pods)
}
// filterFunc filters a set of input pods to a subset.
type filterFunc func(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error)
// toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc.
func toFilterFunc(pp podPredicate) filterFunc {
return func(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
filtered := []*backend.PodMetrics{}
for _, pod := range pods {
pass := pp(b, pod)
if pass {
filtered = append(filtered, pod)
}
}
if len(filtered) == 0 {
return nil, fmt.Errorf("no pods left")
}
return filtered, nil
}
}
// leastQueuingFilterFunc finds the max and min queue size of all pods, divides the whole range
// (max-min) by the number of pods, and finds the pods that fall into the first range.
// The intuition is that if there are multiple pods that share similar queue size in the low range,
// we should consider them all instead of the absolute minimum one. This worked better than picking
// the least one as it gives more choices for the next filter, which on aggregate gave better
// results.
// TODO: Compare this strategy with other strategies such as top K.
func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
min := math.MaxInt
max := 0
filtered := []*backend.PodMetrics{}
for _, pod := range pods {
if pod.WaitingQueueSize <= min {
min = pod.WaitingQueueSize
}
if pod.WaitingQueueSize >= max {
max = pod.WaitingQueueSize
}
}
for _, pod := range pods {
if pod.WaitingQueueSize >= min && pod.WaitingQueueSize <= min+(max-min)/len(pods) {
filtered = append(filtered, pod)
}
}
return filtered, nil
}
// leastKVCacheFilterFunc finds the max and min KV cache of all pods, divides the whole range
// (max-min) by the number of pods, and finds the pods that fall into the first range.
// The intuition is that if there are multiple pods that share similar KV cache in the low range, we
// should consider them all instead of the absolute minimum one. This worked better than picking the
// least one as it gives more choices for the next filter, which on aggregate gave better results.
// TODO: Compare this strategy with other strategies such as top K.
func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
min := math.MaxFloat64
max := math.SmallestNonzeroFloat64
filtered := []*backend.PodMetrics{}
for _, pod := range pods {
if pod.KVCacheUsagePercent <= min {
min = pod.KVCacheUsagePercent
}
if pod.KVCacheUsagePercent >= max {
max = pod.KVCacheUsagePercent
}
}
for _, pod := range pods {
if pod.KVCacheUsagePercent >= min && pod.KVCacheUsagePercent <= min+(max-min)/float64(len(pods)) {
filtered = append(filtered, pod)
}
}
return filtered, nil
}
// podPredicate is a filter function to check whether a pod is desired.
type podPredicate func(b *LLMRequest, pod *backend.PodMetrics) bool
// loraAffinityPredicate return true if the pod have the requested LoRA adapter loaded.
func loraAffinityPredicate(b *LLMRequest, pod *backend.PodMetrics) bool {
_, ok := pod.CachedModels[b.ResolvedTargetModel]
return ok
}