Skip to content

Commit 8d286a5

Browse files
liu-congnayihz
authored andcommitted
Add prefix cache aware scheduling (kubernetes-sigs#768)
* Add prefix cache aware scheduling * Replace scheduler v2 with config v2 * Add score weight to XXScorerConfig * Address comments * Clean up * Change to use container/list lib * cleanup * Add TODO * make linter happy
1 parent f4993ef commit 8d286a5

14 files changed

+851
-6
lines changed

cmd/epp/main.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
healthPb "google.golang.org/grpc/health/grpc_health_v1"
2929
"k8s.io/apimachinery/pkg/types"
3030
ctrl "sigs.k8s.io/controller-runtime"
31+
"sigs.k8s.io/controller-runtime/pkg/log"
3132
"sigs.k8s.io/controller-runtime/pkg/log/zap"
3233
"sigs.k8s.io/controller-runtime/pkg/manager"
3334
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
@@ -38,7 +39,13 @@ import (
3839
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3940
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4041
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
42+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
43+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
44+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
45+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
46+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
4147
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
48+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
4249
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4350
)
4451

@@ -97,8 +104,22 @@ var (
97104
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
98105

99106
setupLog = ctrl.Log.WithName("setup")
107+
108+
// Environment variables
109+
schedulerV2 = envutil.GetEnvString("EXPERIMENTAL_USE_SCHEDULER_V2", "false", setupLog)
110+
prefixCacheScheduling = envutil.GetEnvString("ENABLE_PREFIX_CACHE_SCHEDULING", "false", setupLog)
100111
)
101112

113+
func loadPrefixCacheConfig() prefix.Config {
114+
baseLogger := log.Log.WithName("env-config")
115+
116+
return prefix.Config{
117+
HashBlockSize: envutil.GetEnvInt("PREFIX_CACHE_HASH_BLOCK_SIZE", prefix.DefaultHashBlockSize, baseLogger),
118+
MaxPrefixBlocksToMatch: envutil.GetEnvInt("PREFIX_CACHE_MAX_PREFIX_BLOCKS", prefix.DefaultMaxPrefixBlocks, baseLogger),
119+
LRUIndexerCapacity: envutil.GetEnvInt("PREFIX_CACHE_LRU_CAPACITY", prefix.DefaultLRUIndexerCapacity, baseLogger),
120+
}
121+
}
122+
102123
func main() {
103124
if err := run(); err != nil {
104125
os.Exit(1)
@@ -173,6 +194,27 @@ func run() error {
173194
datastore := datastore.NewDatastore(ctx, pmf)
174195

175196
scheduler := scheduling.NewScheduler(datastore)
197+
if schedulerV2 == "true" {
198+
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
199+
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
200+
scorers := map[plugins.Scorer]int{
201+
&scorer.QueueScorer{}: queueScorerWeight,
202+
&scorer.KVCacheScorer{}: kvCacheScorerWeight,
203+
}
204+
schedConfigOpts := []scheduling.ConfigOption{}
205+
if prefixCacheScheduling == "true" {
206+
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
207+
schedConfigOpts = append(schedConfigOpts, scheduling.AddPrefixPlugin(loadPrefixCacheConfig(), prefixScorerWeight))
208+
}
209+
schedulerConfig := scheduling.NewSchedulerConfig(
210+
[]plugins.PreSchedule{},
211+
[]plugins.Filter{filter.NewSheddableCapacityFilter()},
212+
scorers,
213+
picker.NewMaxScorePicker(),
214+
[]plugins.PostSchedule{},
215+
schedConfigOpts...)
216+
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
217+
}
176218
serverRunner := &runserver.ExtProcServerRunner{
177219
GrpcPort: *grpcPort,
178220
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,

pkg/epp/metrics/metrics.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,40 @@ var (
198198
[]string{"plugin_type", "plugin_name"},
199199
)
200200

201+
// Prefix indexer Metrics
202+
PrefixCacheSize = compbasemetrics.NewGaugeVec(
203+
&compbasemetrics.GaugeOpts{
204+
Subsystem: InferenceExtension,
205+
Name: "prefix_indexer_size",
206+
Help: "Size of the prefix indexer.",
207+
StabilityLevel: compbasemetrics.ALPHA,
208+
},
209+
[]string{},
210+
)
211+
212+
PrefixCacheHitRatio = compbasemetrics.NewHistogramVec(
213+
&compbasemetrics.HistogramOpts{
214+
Subsystem: InferenceExtension,
215+
Name: "prefix_indexer_hit_ratio",
216+
Help: "Ratio of prefix length matched to total prefix length in the cache lookup.",
217+
// Buckets from 0.0 to 1.0 in increments
218+
Buckets: []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
219+
StabilityLevel: compbasemetrics.ALPHA,
220+
},
221+
[]string{},
222+
)
223+
224+
PrefixCacheHitLength = compbasemetrics.NewHistogramVec(
225+
&compbasemetrics.HistogramOpts{
226+
Subsystem: InferenceExtension,
227+
Name: "prefix_indexer_hit_bytes",
228+
Help: "Length of the prefix match in number of bytes in the cache lookup.",
229+
Buckets: []float64{0, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536},
230+
StabilityLevel: compbasemetrics.ALPHA,
231+
},
232+
[]string{},
233+
)
234+
201235
// Info Metrics
202236
InferenceExtensionInfo = prometheus.NewGaugeVec(
203237
prometheus.GaugeOpts{
@@ -231,6 +265,10 @@ func Register() {
231265
metrics.Registry.MustRegister(SchedulerE2ELatency)
232266

233267
metrics.Registry.MustRegister(InferenceExtensionInfo)
268+
269+
metrics.Registry.MustRegister(PrefixCacheSize)
270+
metrics.Registry.MustRegister(PrefixCacheHitRatio)
271+
metrics.Registry.MustRegister(PrefixCacheHitLength)
234272
})
235273
}
236274

@@ -358,6 +396,24 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
358396
SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
359397
}
360398

399+
// RecordPrefixCacheSize records the size of the prefix indexer in megabytes.
400+
func RecordPrefixCacheSize(size int64) {
401+
PrefixCacheSize.WithLabelValues().Set(float64(size))
402+
}
403+
404+
// RecordPrefixCacheMatch records both the hit ratio and hit length for a prefix indexer match.
405+
// matchedLength is the number of characters that matched, and totalLength is the total prefix length.
406+
func RecordPrefixCacheMatch(matchedLength, totalLength int) {
407+
// Record the hit length metric
408+
PrefixCacheHitLength.WithLabelValues().Observe(float64(matchedLength))
409+
410+
// Record the hit ratio metric if totalLength is positive
411+
if totalLength > 0 {
412+
ratio := float64(matchedLength) / float64(totalLength)
413+
PrefixCacheHitRatio.WithLabelValues().Observe(ratio)
414+
}
415+
}
416+
361417
func RecordInferenceExtensionInfo() {
362418
if CommitSHA != "" {
363419
InferenceExtensionInfo.WithLabelValues(CommitSHA).Set(1)

pkg/epp/metrics/metrics_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,3 +664,106 @@ func TestSchedulerE2ELatency(t *testing.T) {
664664
})
665665
}
666666
}
667+
668+
func TestPrefixCacheMetrics(t *testing.T) {
669+
const (
670+
PrefixCacheSizeMetric = InferenceExtension + "_prefix_indexer_size"
671+
PrefixCacheHitRatioMetric = InferenceExtension + "_prefix_indexer_hit_ratio"
672+
PrefixCacheHitLengthMetric = InferenceExtension + "_prefix_indexer_hit_bytes"
673+
)
674+
675+
type cacheMatchRecord struct {
676+
matchedLength int
677+
totalLength int
678+
}
679+
680+
scenario := struct {
681+
name string
682+
cacheSizes []int64
683+
cacheMatches []cacheMatchRecord
684+
}{
685+
name: "multiple cache metrics",
686+
cacheSizes: []int64{1024, 2048, 4096},
687+
cacheMatches: []cacheMatchRecord{
688+
{
689+
matchedLength: 5,
690+
totalLength: 10,
691+
},
692+
{
693+
matchedLength: 0,
694+
totalLength: 10,
695+
},
696+
{
697+
matchedLength: 10,
698+
totalLength: 10,
699+
},
700+
{
701+
matchedLength: 7,
702+
totalLength: 10,
703+
},
704+
{
705+
matchedLength: 64,
706+
totalLength: 128,
707+
},
708+
{
709+
matchedLength: 0,
710+
totalLength: 128,
711+
},
712+
},
713+
}
714+
715+
Register()
716+
t.Run(scenario.name, func(t *testing.T) {
717+
// Record cache size metrics
718+
for _, size := range scenario.cacheSizes {
719+
RecordPrefixCacheSize(size)
720+
}
721+
722+
// Record cache match metrics (both hit ratio and hit length)
723+
for _, match := range scenario.cacheMatches {
724+
RecordPrefixCacheMatch(match.matchedLength, match.totalLength)
725+
}
726+
727+
// Verify cache size metrics
728+
wantCacheSizeMetrics, err := os.Open("testdata/prefix_indexer_size_metric")
729+
defer func() {
730+
if err := wantCacheSizeMetrics.Close(); err != nil {
731+
t.Error(err)
732+
}
733+
}()
734+
if err != nil {
735+
t.Fatal(err)
736+
}
737+
if err := testutil.GatherAndCompare(metrics.Registry, wantCacheSizeMetrics, PrefixCacheSizeMetric); err != nil {
738+
t.Error(err)
739+
}
740+
741+
// Verify hit ratio metrics
742+
wantHitRatioMetrics, err := os.Open("testdata/prefix_indexer_hit_ratio_metric")
743+
defer func() {
744+
if err := wantHitRatioMetrics.Close(); err != nil {
745+
t.Error(err)
746+
}
747+
}()
748+
if err != nil {
749+
t.Fatal(err)
750+
}
751+
if err := testutil.GatherAndCompare(metrics.Registry, wantHitRatioMetrics, PrefixCacheHitRatioMetric); err != nil {
752+
t.Error(err)
753+
}
754+
755+
// Verify hit length metrics
756+
wantHitLengthMetrics, err := os.Open("testdata/prefix_indexer_hit_bytes_metric")
757+
defer func() {
758+
if err := wantHitLengthMetrics.Close(); err != nil {
759+
t.Error(err)
760+
}
761+
}()
762+
if err != nil {
763+
t.Fatal(err)
764+
}
765+
if err := testutil.GatherAndCompare(metrics.Registry, wantHitLengthMetrics, PrefixCacheHitLengthMetric); err != nil {
766+
t.Error(err)
767+
}
768+
})
769+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# HELP inference_extension_prefix_indexer_hit_bytes [ALPHA] Length of the prefix match in number of bytes in the cache lookup.
2+
# TYPE inference_extension_prefix_indexer_hit_bytes histogram
3+
inference_extension_prefix_indexer_hit_bytes_bucket{le="0"} 2
4+
inference_extension_prefix_indexer_hit_bytes_bucket{le="16"} 5
5+
inference_extension_prefix_indexer_hit_bytes_bucket{le="32"} 5
6+
inference_extension_prefix_indexer_hit_bytes_bucket{le="64"} 6
7+
inference_extension_prefix_indexer_hit_bytes_bucket{le="128"} 6
8+
inference_extension_prefix_indexer_hit_bytes_bucket{le="256"} 6
9+
inference_extension_prefix_indexer_hit_bytes_bucket{le="512"} 6
10+
inference_extension_prefix_indexer_hit_bytes_bucket{le="1024"} 6
11+
inference_extension_prefix_indexer_hit_bytes_bucket{le="2048"} 6
12+
inference_extension_prefix_indexer_hit_bytes_bucket{le="4096"} 6
13+
inference_extension_prefix_indexer_hit_bytes_bucket{le="8192"} 6
14+
inference_extension_prefix_indexer_hit_bytes_bucket{le="16384"} 6
15+
inference_extension_prefix_indexer_hit_bytes_bucket{le="32768"} 6
16+
inference_extension_prefix_indexer_hit_bytes_bucket{le="65536"} 6
17+
inference_extension_prefix_indexer_hit_bytes_bucket{le="+Inf"} 6
18+
inference_extension_prefix_indexer_hit_bytes_sum 86
19+
inference_extension_prefix_indexer_hit_bytes_count 6
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# HELP inference_extension_prefix_indexer_hit_ratio [ALPHA] Ratio of prefix length matched to total prefix length in the cache lookup.
2+
# TYPE inference_extension_prefix_indexer_hit_ratio histogram
3+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0"} 2
4+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.1"} 2
5+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.2"} 2
6+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.3"} 2
7+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.4"} 2
8+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.5"} 4
9+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.6"} 4
10+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.7"} 5
11+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.8"} 5
12+
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.9"} 5
13+
inference_extension_prefix_indexer_hit_ratio_bucket{le="1"} 6
14+
inference_extension_prefix_indexer_hit_ratio_bucket{le="+Inf"} 6
15+
inference_extension_prefix_indexer_hit_ratio_sum 2.7
16+
inference_extension_prefix_indexer_hit_ratio_count 6
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HELP inference_extension_prefix_indexer_size [ALPHA] Size of the prefix indexer.
2+
# TYPE inference_extension_prefix_indexer_size gauge
3+
inference_extension_prefix_indexer_size{} 4096

pkg/epp/scheduling/config.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,23 @@ package scheduling
1818

1919
import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
2122
)
2223

2324
// NewSchedulerConfig creates a new SchedulerConfig object with the given plugins.
2425
func NewSchedulerConfig(preSchedulePlugins []plugins.PreSchedule, filters []plugins.Filter, scorers map[plugins.Scorer]int,
25-
picker plugins.Picker, postSchedulePlugins []plugins.PostSchedule) *SchedulerConfig {
26-
return &SchedulerConfig{
26+
picker plugins.Picker, postSchedulePlugins []plugins.PostSchedule, opts ...ConfigOption) *SchedulerConfig {
27+
config := &SchedulerConfig{
2728
preSchedulePlugins: preSchedulePlugins,
2829
filters: filters,
2930
scorers: scorers,
3031
picker: picker,
3132
postSchedulePlugins: postSchedulePlugins,
3233
}
34+
for _, opt := range opts {
35+
opt(config)
36+
}
37+
return config
3338
}
3439

3540
// SchedulerConfig provides a configuration for the scheduler which influence routing decisions.
@@ -40,3 +45,16 @@ type SchedulerConfig struct {
4045
picker plugins.Picker
4146
postSchedulePlugins []plugins.PostSchedule
4247
}
48+
49+
type ConfigOption func(*SchedulerConfig)
50+
51+
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/813): Replace this
52+
// with a more generic way to add plugins.
53+
func AddPrefixPlugin(prefixConfig prefix.Config, weight int) ConfigOption {
54+
return func(cfg *SchedulerConfig) {
55+
prefixPlugin := prefix.New(prefixConfig)
56+
cfg.preSchedulePlugins = append(cfg.preSchedulePlugins, prefixPlugin)
57+
cfg.postSchedulePlugins = append(cfg.postSchedulePlugins, prefixPlugin)
58+
cfg.scorers[prefixPlugin] = weight
59+
}
60+
}

0 commit comments

Comments
 (0)