Skip to content

Commit 1b01a28

Browse files
committed
Add prefix cache aware scheduling
1 parent 66b9889 commit 1b01a28

File tree

14 files changed

+927
-0
lines changed

14 files changed

+927
-0
lines changed

cmd/epp/main.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/client-go/rest"
3535
"k8s.io/component-base/metrics/legacyregistry"
3636
ctrl "sigs.k8s.io/controller-runtime"
37+
"sigs.k8s.io/controller-runtime/pkg/log"
3738
"sigs.k8s.io/controller-runtime/pkg/log/zap"
3839
"sigs.k8s.io/controller-runtime/pkg/manager"
3940
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
@@ -43,7 +44,9 @@ import (
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
47+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
4648
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
49+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
4750
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4851
)
4952

@@ -107,8 +110,24 @@ var (
107110
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
108111

109112
setupLog = ctrl.Log.WithName("setup")
113+
114+
// Environment variables
115+
schedulerV2 = envutil.GetEnvString("EXPERIMENTAL_USE_SCHEDULE_V2", "false", setupLog)
116+
prefixCacheConfig = loadPrefixCacheConfig()
110117
)
111118

119+
func loadPrefixCacheConfig() prefix.Config {
120+
// logger := zap.New(zap.RawZapOpts(uberzap.AddCaller()))
121+
// log.SetLogger(logger)
122+
baseLogger := log.Log.WithName("env-config")
123+
124+
return prefix.Config{
125+
HashBlockSize: envutil.GetEnvInt("PREFIX_CACHE_HASH_BLOCK_SIZE", prefix.DefaultCacheBlockSize, baseLogger),
126+
MaxPrefixBlocksToMatch: envutil.GetEnvInt("PREFIX_CACHE_MAX_PREFIX_BLOCKS", prefix.DefaultMaxPrefixBlocks, baseLogger),
127+
LRUIndexerCapacity: envutil.GetEnvInt("PREFIX_CACHE_MAX_CACHE_SIZE_MB", prefix.DefaultLRUIndexerCapacity, baseLogger),
128+
}
129+
}
130+
112131
func main() {
113132
if err := run(); err != nil {
114133
os.Exit(1)
@@ -172,6 +191,10 @@ func run() error {
172191
datastore := datastore.NewDatastore(ctx, pmf)
173192

174193
scheduler := scheduling.NewScheduler(datastore)
194+
if schedulerV2 == "true" {
195+
setupLog.Info("Creating scheduler with prefixCache plugin", "prefix cache config", prefixCacheConfig)
196+
scheduler = scheduling.NewSchedulerV2(datastore, prefixCacheConfig)
197+
}
175198
serverRunner := &runserver.ExtProcServerRunner{
176199
GrpcPort: *grpcPort,
177200
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,

pkg/epp/metrics/metrics.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package metrics
1818

1919
import (
2020
"context"
21+
"runtime/debug"
2122
"sync"
2223
"time"
2324

@@ -219,6 +220,40 @@ var (
219220
},
220221
[]string{"commit"},
221222
)
223+
224+
// Prefix indexer Metrics
225+
PrefixCacheSize = compbasemetrics.NewGaugeVec(
226+
&compbasemetrics.GaugeOpts{
227+
Subsystem: InferenceExtension,
228+
Name: "prefix_indexer_size",
229+
Help: "Size of the prefix indexer.",
230+
StabilityLevel: compbasemetrics.ALPHA,
231+
},
232+
[]string{},
233+
)
234+
235+
PrefixCacheHitRatio = compbasemetrics.NewHistogramVec(
236+
&compbasemetrics.HistogramOpts{
237+
Subsystem: InferenceExtension,
238+
Name: "prefix_indexer_hit_ratio",
239+
Help: "Ratio of prefix length matched to total prefix length in the cache lookup.",
240+
// Buckets from 0.0 to 1.0 in increments
241+
Buckets: []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
242+
StabilityLevel: compbasemetrics.ALPHA,
243+
},
244+
[]string{},
245+
)
246+
247+
PrefixCacheHitLength = compbasemetrics.NewHistogramVec(
248+
&compbasemetrics.HistogramOpts{
249+
Subsystem: InferenceExtension,
250+
Name: "prefix_indexer_hit_bytes",
251+
Help: "Length of the prefix match in number of bytes in the cache lookup.",
252+
Buckets: []float64{0, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536},
253+
StabilityLevel: compbasemetrics.ALPHA,
254+
},
255+
[]string{},
256+
)
222257
)
223258

224259
var registerMetrics sync.Once
@@ -244,6 +279,10 @@ func Register() {
244279
legacyregistry.MustRegister(SchedulerE2ELatency)
245280

246281
legacyregistry.MustRegister(InferenceExtensionInfo)
282+
283+
legacyregistry.MustRegister(PrefixCacheSize)
284+
legacyregistry.MustRegister(PrefixCacheHitRatio)
285+
legacyregistry.MustRegister(PrefixCacheHitLength)
247286
})
248287
}
249288

@@ -352,8 +391,44 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
352391
SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
353392
}
354393

394+
// RecordPrefixCacheSize records the size of the prefix indexer in megabytes.
395+
func RecordPrefixCacheSize(size int64) {
396+
PrefixCacheSize.WithLabelValues().Set(float64(size))
397+
}
398+
399+
// RecordPrefixCacheMatch records both the hit ratio and hit length for a prefix indexer match.
400+
// matchedLength is the number of characters that matched, and totalLength is the total prefix length.
401+
func RecordPrefixCacheMatch(matchedLength, totalLength int) {
402+
// Record the hit length metric
403+
PrefixCacheHitLength.WithLabelValues().Observe(float64(matchedLength))
404+
405+
// Record the hit ratio metric if totalLength is positive
406+
if totalLength > 0 {
407+
ratio := float64(matchedLength) / float64(totalLength)
408+
PrefixCacheHitRatio.WithLabelValues().Observe(ratio)
409+
}
410+
}
411+
355412
func RecordInferenceExtensionInfo() {
356413
if CommitSHA != "" {
357414
InferenceExtensionInfo.WithLabelValues(CommitSHA).Set(1)
358415
}
359416
}
417+
418+
func init() {
419+
info, ok := debug.ReadBuildInfo()
420+
if !ok {
421+
return
422+
}
423+
424+
var Commit = func(i *debug.BuildInfo) string {
425+
for _, setting := range i.Settings {
426+
if setting.Key == "vcs.revision" {
427+
return setting.Value
428+
}
429+
}
430+
return ""
431+
}(info)
432+
433+
CommitSHA = Commit
434+
}

pkg/epp/metrics/metrics_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,3 +664,106 @@ func TestSchedulerE2ELatency(t *testing.T) {
664664
})
665665
}
666666
}
667+
668+
func TestPrefixCacheMetrics(t *testing.T) {
669+
const (
670+
PrefixCacheSizeMetric = InferenceExtension + "_prefix_indexer_size"
671+
PrefixCacheHitRatioMetric = InferenceExtension + "_prefix_indexer_hit_ratio"
672+
PrefixCacheHitLengthMetric = InferenceExtension + "_prefix_indexer_hit_bytes"
673+
)
674+
675+
type cacheMatchRecord struct {
676+
matchedLength int
677+
totalLength int
678+
}
679+
680+
scenario := struct {
681+
name string
682+
cacheSizes []int64
683+
cacheMatches []cacheMatchRecord
684+
}{
685+
name: "multiple cache metrics",
686+
cacheSizes: []int64{1024, 2048, 4096},
687+
cacheMatches: []cacheMatchRecord{
688+
{
689+
matchedLength: 5,
690+
totalLength: 10,
691+
},
692+
{
693+
matchedLength: 0,
694+
totalLength: 10,
695+
},
696+
{
697+
matchedLength: 10,
698+
totalLength: 10,
699+
},
700+
{
701+
matchedLength: 7,
702+
totalLength: 10,
703+
},
704+
{
705+
matchedLength: 64,
706+
totalLength: 128,
707+
},
708+
{
709+
matchedLength: 0,
710+
totalLength: 128,
711+
},
712+
},
713+
}
714+
715+
Register()
716+
t.Run(scenario.name, func(t *testing.T) {
717+
// Record cache size metrics
718+
for _, size := range scenario.cacheSizes {
719+
RecordPrefixCacheSize(size)
720+
}
721+
722+
// Record cache match metrics (both hit ratio and hit length)
723+
for _, match := range scenario.cacheMatches {
724+
RecordPrefixCacheMatch(match.matchedLength, match.totalLength)
725+
}
726+
727+
// Verify cache size metrics
728+
wantCacheSizeMetrics, err := os.Open("testdata/prefix_indexer_size_metric")
729+
defer func() {
730+
if err := wantCacheSizeMetrics.Close(); err != nil {
731+
t.Error(err)
732+
}
733+
}()
734+
if err != nil {
735+
t.Fatal(err)
736+
}
737+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantCacheSizeMetrics, PrefixCacheSizeMetric); err != nil {
738+
t.Error(err)
739+
}
740+
741+
// Verify hit ratio metrics
742+
wantHitRatioMetrics, err := os.Open("testdata/prefix_indexer_hit_ratio_metric")
743+
defer func() {
744+
if err := wantHitRatioMetrics.Close(); err != nil {
745+
t.Error(err)
746+
}
747+
}()
748+
if err != nil {
749+
t.Fatal(err)
750+
}
751+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitRatioMetrics, PrefixCacheHitRatioMetric); err != nil {
752+
t.Error(err)
753+
}
754+
755+
// Verify hit length metrics
756+
wantHitLengthMetrics, err := os.Open("testdata/prefix_indexer_hit_bytes_metric")
757+
defer func() {
758+
if err := wantHitLengthMetrics.Close(); err != nil {
759+
t.Error(err)
760+
}
761+
}()
762+
if err != nil {
763+
t.Fatal(err)
764+
}
765+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitLengthMetrics, PrefixCacheHitLengthMetric); err != nil {
766+
t.Error(err)
767+
}
768+
})
769+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# HELP inference_extension_prefix_indexer_hit_bytes [ALPHA] Length of the prefix match in number of bytes in the cache lookup.
2+
# TYPE inference_extension_prefix_indexer_hit_bytes histogram
3+
inference_extension_prefix_indexer_hit_bytes_bucket{le="0"} 2
4+
inference_extension_prefix_indexer_hit_bytes_bucket{le="16"} 5
5+
inference_extension_prefix_indexer_hit_bytes_bucket{le="32"} 5
6+
inference_extension_prefix_indexer_hit_bytes_bucket{le="64"} 6
7+
inference_extension_prefix_indexer_hit_bytes_bucket{le="128"} 6
8+
inference_extension_prefix_indexer_hit_bytes_bucket{le="256"} 6
9+
inference_extension_prefix_indexer_hit_bytes_bucket{le="512"} 6
10+
inference_extension_prefix_indexer_hit_bytes_bucket{le="1024"} 6
11+
inference_extension_prefix_indexer_hit_bytes_bucket{le="2048"} 6
12+
inference_extension_prefix_indexer_hit_bytes_bucket{le="4096"} 6
13+
inference_extension_prefix_indexer_hit_bytes_bucket{le="8192"} 6
14+
inference_extension_prefix_indexer_hit_bytes_bucket{le="16384"} 6
15+
inference_extension_prefix_indexer_hit_bytes_bucket{le="32768"} 6
16+
inference_extension_prefix_indexer_hit_bytes_bucket{le="65536"} 6
17+
inference_extension_prefix_indexer_hit_bytes_bucket{le="+Inf"} 6
18+
inference_extension_prefix_indexer_hit_bytes_sum 86
19+
inference_extension_prefix_indexer_hit_bytes_count 6
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# HELP inference_extension_prefix_indexer_hit_ratio [ALPHA] Ratio of prefix length matched to total prefix length in the cache lookup.
2+
# TYPE inference_extension_prefix_indexer_hit_ratio histogram
3+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0"} 2
4+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.1"} 2
5+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.2"} 2
6+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.3"} 2
7+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.4"} 2
8+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.5"} 4
9+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.6"} 4
10+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.7"} 5
11+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.8"} 5
12+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.9"} 5
13+
inference_extension_prefix_indexer_hit_ratio_bucket{le="1"} 6
14+
inference_extension_prefix_indexer_hit_ratio_bucket{le="+Inf"} 6
15+
inference_extension_prefix_indexer_hit_ratio_sum 2.7
16+
inference_extension_prefix_indexer_hit_ratio_count 6
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HELP inference_extension_prefix_indexer_size [ALPHA] Size of the prefix indexer.
2+
# TYPE inference_extension_prefix_indexer_size gauge
3+
inference_extension_prefix_indexer_size{} 4096

pkg/epp/scheduling/plugins/filter/filter.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,14 @@ var HasCapacityFilter = &baseFilter{
256256
filter: toFilterFunc(queueThresholdPredicate(config.Conf.QueueThresholdCritical).and(kvCacheThresholdPredicate(config.Conf.KVCacheThreshold))),
257257
}
258258

259+
// NoopFilter is a filter that does not filter out any pods.
260+
var NoopFilter = &baseFilter{
261+
name: "noop",
262+
filter: toFilterFunc(func(req *types.LLMRequest, pod types.Pod) bool {
263+
return true
264+
}),
265+
}
266+
259267
// podPredicate is a filter function to check whether a pod is desired.
260268
type podPredicate func(req *types.LLMRequest, pod types.Pod) bool
261269

0 commit comments

Comments
 (0)