Skip to content

Commit df82ff2

Browse files
nirrozenbaumirar2
authored andcommitted
minor changes to saturation detector (kubernetes-sigs#882)
* small changes to saturation detector Signed-off-by: Nir Rozenbaum <[email protected]> * var rename Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent f142921 commit df82ff2

File tree

3 files changed

+104
-125
lines changed

3 files changed

+104
-125
lines changed

pkg/epp/saturationdetector/config.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,29 @@ const (
4242
EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD"
4343
)
4444

45-
// LoadConfigFromEnv loads SaturationDetector Config from environment
46-
// variables.
45+
// LoadConfigFromEnv loads SaturationDetector Config from environment variables.
4746
func LoadConfigFromEnv() *Config {
4847
// Use a default logger for initial configuration loading.
4948
logger := log.Log.WithName("saturation-detector-config")
5049

5150
cfg := &Config{}
5251

5352
cfg.QueueDepthThreshold = envutil.GetEnvInt(EnvSdQueueDepthThreshold, DefaultQueueDepthThreshold, logger)
53+
if cfg.QueueDepthThreshold <= 0 {
54+
cfg.QueueDepthThreshold = DefaultQueueDepthThreshold
55+
}
56+
5457
cfg.KVCacheUtilThreshold = envutil.GetEnvFloat(EnvSdKVCacheUtilThreshold, DefaultKVCacheUtilThreshold, logger)
58+
if cfg.KVCacheUtilThreshold <= 0 || cfg.KVCacheUtilThreshold >= 1 {
59+
cfg.KVCacheUtilThreshold = DefaultKVCacheUtilThreshold
60+
}
61+
5562
cfg.MetricsStalenessThreshold = envutil.GetEnvDuration(EnvSdMetricsStalenessThreshold, DefaultMetricsStalenessThreshold, logger)
63+
if cfg.MetricsStalenessThreshold <= 0 {
64+
cfg.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold
65+
}
5666

5767
// NewDetector validates the config and assigns defaults.
58-
logger.Info("SaturationDetector configuration loaded from env",
59-
"config", fmt.Sprintf("%+v", cfg))
68+
logger.Info("SaturationDetector configuration loaded from env", "config", fmt.Sprintf("%+v", cfg))
6069
return cfg
6170
}

pkg/epp/saturationdetector/saturationdetector.go

Lines changed: 26 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,6 @@ import (
4141
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4242
)
4343

44-
// clock allows mocking time in tests.
45-
type clock interface {
46-
now() time.Time
47-
}
48-
49-
// realClock provides the real time.
50-
type realClock struct{}
51-
52-
func (c realClock) now() time.Time { return time.Now() }
53-
5444
const (
5545
// loggerName is the name to use for loggers created by this package.
5646
loggerName = "SaturationDetector"
@@ -92,29 +82,25 @@ type Datastore interface {
9282
// more beneficial.
9383
type Detector struct {
9484
datastore Datastore
95-
config Config
96-
clock clock
85+
config *Config
9786
}
9887

9988
// NewDetector creates a new SaturationDetector.
10089
// The datastore is expected to provide access to live/recently-updated pod
10190
// metrics.
10291
// The config provides the thresholds for determining saturation.
103-
func NewDetector(config Config, datastore Datastore, logger logr.Logger) (*Detector, error) {
92+
func NewDetector(config *Config, datastore Datastore, logger logr.Logger) (*Detector, error) {
10493
if datastore == nil {
10594
return nil, ErrNilDatastore
10695
}
107-
if config.MetricsStalenessThreshold <= 0 {
108-
config.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold
109-
}
11096
logger.WithName(loggerName).V(logutil.DEFAULT).Info("Creating new SaturationDetector",
11197
"queueDepthThreshold", config.QueueDepthThreshold,
11298
"kvCacheUtilThreshold", config.KVCacheUtilThreshold,
11399
"metricsStalenessThreshold", config.MetricsStalenessThreshold.String())
100+
114101
return &Detector{
115102
datastore: datastore,
116103
config: config,
117-
clock: realClock{},
118104
}, nil
119105
}
120106

@@ -137,8 +123,6 @@ func (d *Detector) IsSaturated(ctx context.Context) bool {
137123
return true
138124
}
139125

140-
now := d.clock.now()
141-
foundPodWithGoodCapacity := false
142126
for _, podMetric := range allPodsMetrics {
143127
metrics := podMetric.GetMetrics()
144128
podNn := "unknown-pod"
@@ -147,44 +131,38 @@ func (d *Detector) IsSaturated(ctx context.Context) bool {
147131
}
148132

149133
if metrics == nil {
150-
logger.V(logutil.VERBOSE).Info("Pod has nil metrics, skipping for saturation check",
134+
logger.V(logutil.TRACE).Info("Pod has nil metrics, skipping for saturation check",
151135
"pod", podNn)
152136
continue
153137
}
154138

155-
// 1. Check for metric staleness
156-
if now.Sub(metrics.UpdateTime) > d.config.MetricsStalenessThreshold {
157-
logger.V(logutil.VERBOSE).Info("Pod metrics are stale, considered as not having good capacity",
158-
"pod", podNn,
159-
"updateTime", metrics.UpdateTime,
160-
"stalenessThreshold", d.config.MetricsStalenessThreshold)
139+
// Check for metric staleness
140+
if time.Since(metrics.UpdateTime) > d.config.MetricsStalenessThreshold {
141+
logger.V(logutil.TRACE).Info("Pod metrics are stale, considered as not having good capacity",
142+
"pod", podNn, "updateTime", metrics.UpdateTime, "stalenessThreshold", d.config.MetricsStalenessThreshold)
161143
continue
162144
}
163145

164-
// 2. Check queue depth
165-
isQueueGood := metrics.WaitingQueueSize <= d.config.QueueDepthThreshold
166-
167-
// 3. Check KV cache utilization
168-
isKVCacheGood := metrics.KVCacheUsagePercent <= d.config.KVCacheUtilThreshold
169-
170-
if isQueueGood && isKVCacheGood {
171-
logger.V(logutil.VERBOSE).Info("Found pod with good capacity",
172-
"pod", podNn,
173-
"waitingQueue", metrics.WaitingQueueSize,
174-
"queueThreshold", d.config.QueueDepthThreshold,
175-
"kvCacheUtil", metrics.KVCacheUsagePercent,
176-
"kvCacheThreshold", d.config.KVCacheUtilThreshold)
177-
foundPodWithGoodCapacity = true
178-
// Found at least one pod with good capacity, so system is NOT saturated.
179-
break
146+
// Check queue depth
147+
if metrics.WaitingQueueSize > d.config.QueueDepthThreshold {
148+
logger.V(logutil.TRACE).Info("Pod WaitingQueueSize is above threshold, considered as not having good capacity",
149+
"pod", podNn, "waitingQueueSize", metrics.WaitingQueueSize, "threshold", d.config.QueueDepthThreshold)
150+
continue // WaitingQueueSize is above threshold, considered saturated.
180151
}
181-
}
182152

183-
if !foundPodWithGoodCapacity {
184-
logger.V(logutil.VERBOSE).Info("No pods found with good capacity; system is considered SATURATED.")
185-
return true
153+
// Check KV cache utilization
154+
if metrics.KVCacheUsagePercent > d.config.KVCacheUtilThreshold {
155+
logger.V(logutil.TRACE).Info("Pod KVCacheUsagePercent is above threshold, considered as not having good capacity",
156+
"pod", podNn, "kvCacheUsagePercent", metrics.KVCacheUsagePercent, "threshold", d.config.KVCacheUtilThreshold)
157+
continue // KVCacheUsagePercent is above threshold, considered saturated.
158+
}
159+
160+
logger.V(logutil.TRACE).Info("Found pod with good capacity", "pod", podNn, "waitingQueue", metrics.WaitingQueueSize,
161+
"queueThreshold", d.config.QueueDepthThreshold, "kvCacheUtil", metrics.KVCacheUsagePercent, "kvCacheThreshold", d.config.KVCacheUtilThreshold)
162+
163+
return false // Found at least one pod with good capacity, so system is NOT saturated.
186164
}
187165

188-
logger.V(logutil.VERBOSE).Info("System is considered NOT saturated (at least one pod has good capacity).")
189-
return false
166+
logger.V(logutil.VERBOSE).Info("No pods found with good capacity; system is considered SATURATED.")
167+
return true
190168
}

pkg/epp/saturationdetector/saturationdetector_test.go

Lines changed: 65 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package saturationdetector
1919
import (
2020
"context"
2121
"errors"
22-
"sync"
22+
"fmt"
23+
"os"
24+
"strconv"
2325
"testing"
2426
"time"
2527

@@ -44,22 +46,6 @@ func (fds *mockDatastore) PodGetAll() []backendmetrics.PodMetrics {
4446
return pm
4547
}
4648

47-
// mockClock allows controlling time in tests.
48-
type mockClock struct {
49-
mu sync.RWMutex
50-
time time.Time
51-
}
52-
53-
func newMockClock(t time.Time) *mockClock {
54-
return &mockClock{time: t}
55-
}
56-
57-
func (c *mockClock) now() time.Time {
58-
c.mu.RLock()
59-
defer c.mu.RUnlock()
60-
return c.time
61-
}
62-
6349
func newMockPodMetrics(name string, metrics *backendmetrics.MetricsState) *backendmetrics.FakePodMetrics {
6450
return &backendmetrics.FakePodMetrics{
6551
Pod: &backend.Pod{
@@ -73,61 +59,82 @@ func newMockPodMetrics(name string, metrics *backendmetrics.MetricsState) *backe
7359

7460
func TestNewDetector(t *testing.T) {
7561
tests := []struct {
76-
name string
77-
config Config
78-
datastore Datastore
79-
expectError error
80-
expectedStalenessThresh time.Duration
62+
name string
63+
config *Config
64+
datastore Datastore
65+
expectError error
66+
expectedQueueDepthThreshold int
67+
expectedKVCacheUtilThreshold float64
68+
expectedStalenessThreshold time.Duration
8169
}{
8270
{
8371
name: "Valid config",
84-
config: Config{
72+
config: &Config{
8573
QueueDepthThreshold: 10,
8674
KVCacheUtilThreshold: 0.8,
8775
MetricsStalenessThreshold: 100 * time.Millisecond,
8876
},
89-
datastore: &mockDatastore{},
90-
expectError: nil,
91-
expectedStalenessThresh: 100 * time.Millisecond,
77+
datastore: &mockDatastore{},
78+
expectError: nil,
79+
expectedQueueDepthThreshold: 10,
80+
expectedKVCacheUtilThreshold: 0.8,
81+
expectedStalenessThreshold: 100 * time.Millisecond,
9282
},
9383
{
94-
name: "Nil datastore",
95-
config: Config{},
96-
datastore: nil,
97-
expectError: ErrNilDatastore,
98-
expectedStalenessThresh: DefaultMetricsStalenessThreshold, // Default will be set if error didn't occur first
84+
name: "Nil datastore",
85+
config: &Config{},
86+
datastore: nil,
87+
expectError: ErrNilDatastore,
9988
},
10089
{
101-
name: "Zero staleness threshold uses default",
102-
config: Config{
103-
QueueDepthThreshold: 5,
104-
KVCacheUtilThreshold: 0.9,
105-
MetricsStalenessThreshold: 0, // Should use default
90+
name: "invalid thresholds, fallback to default",
91+
config: &Config{
92+
QueueDepthThreshold: -1,
93+
KVCacheUtilThreshold: -5,
94+
MetricsStalenessThreshold: 0,
95+
},
96+
datastore: &mockDatastore{},
97+
expectError: nil,
98+
expectedQueueDepthThreshold: DefaultQueueDepthThreshold,
99+
expectedKVCacheUtilThreshold: DefaultKVCacheUtilThreshold,
100+
expectedStalenessThreshold: DefaultMetricsStalenessThreshold,
101+
},
102+
{
103+
name: "kv cache threshold above range, fallback to default",
104+
config: &Config{
105+
QueueDepthThreshold: 10,
106+
KVCacheUtilThreshold: 1.5,
107+
MetricsStalenessThreshold: 100 * time.Millisecond,
106108
},
107-
datastore: &mockDatastore{},
108-
expectError: nil,
109-
expectedStalenessThresh: DefaultMetricsStalenessThreshold,
109+
datastore: &mockDatastore{},
110+
expectError: nil,
111+
expectedQueueDepthThreshold: 10,
112+
expectedKVCacheUtilThreshold: DefaultKVCacheUtilThreshold,
113+
expectedStalenessThreshold: 100 * time.Millisecond,
110114
},
111115
}
112116

113-
for _, tt := range tests {
114-
t.Run(tt.name, func(t *testing.T) {
115-
detector, err := NewDetector(tt.config, tt.datastore, logr.Discard())
117+
for _, test := range tests {
118+
t.Run(test.name, func(t *testing.T) {
119+
// validate configuration values are loaded from env vars properly, including the use of default values when provided value is invalid.
120+
os.Setenv(EnvSdQueueDepthThreshold, strconv.Itoa(test.config.QueueDepthThreshold))
121+
os.Setenv(EnvSdKVCacheUtilThreshold, fmt.Sprintf("%v", test.config.KVCacheUtilThreshold))
122+
os.Setenv(EnvSdMetricsStalenessThreshold, test.config.MetricsStalenessThreshold.String())
123+
detector, err := NewDetector(LoadConfigFromEnv(), test.datastore, logr.Discard())
116124

117-
if !errors.Is(err, tt.expectError) {
118-
t.Errorf("NewDetector() error = %v, wantErr %v", err, tt.expectError)
125+
if !errors.Is(err, test.expectError) {
126+
t.Errorf("NewDetector() error = %v, wantErr %v", err, test.expectError)
119127
}
120128

121129
if err == nil && detector != nil {
122-
detector.clock = newMockClock(time.Now())
123-
if detector.config.MetricsStalenessThreshold != tt.expectedStalenessThresh {
124-
t.Errorf("NewDetector() MetricsStalenessThreshold = %v, want %v", detector.config.MetricsStalenessThreshold, tt.expectedStalenessThresh)
130+
if detector.config.QueueDepthThreshold != test.expectedQueueDepthThreshold {
131+
t.Errorf("NewDetector() QueueDepthThreshold = %d, want %d", detector.config.QueueDepthThreshold, test.expectedQueueDepthThreshold)
125132
}
126-
if detector.config.QueueDepthThreshold != tt.config.QueueDepthThreshold {
127-
t.Errorf("NewDetector() QueueDepthThreshold = %d, want %d", detector.config.QueueDepthThreshold, tt.config.QueueDepthThreshold)
133+
if detector.config.KVCacheUtilThreshold != test.expectedKVCacheUtilThreshold {
134+
t.Errorf("NewDetector() KVCacheUtilThreshold = %f, want %f", detector.config.KVCacheUtilThreshold, test.expectedKVCacheUtilThreshold)
128135
}
129-
if detector.config.KVCacheUtilThreshold != tt.config.KVCacheUtilThreshold {
130-
t.Errorf("NewDetector() KVCacheUtilThreshold = %f, want %f", detector.config.KVCacheUtilThreshold, tt.config.KVCacheUtilThreshold)
136+
if detector.config.MetricsStalenessThreshold != test.expectedStalenessThreshold {
137+
t.Errorf("NewDetector() MetricsStalenessThreshold = %v, want %v", detector.config.MetricsStalenessThreshold, test.expectedStalenessThreshold)
131138
}
132139
}
133140
})
@@ -136,15 +143,15 @@ func TestNewDetector(t *testing.T) {
136143

137144
func TestDetector_IsSaturated(t *testing.T) {
138145
baseTime := time.Now()
139-
defaultConfig := Config{
146+
defaultConfig := &Config{
140147
QueueDepthThreshold: 5,
141148
KVCacheUtilThreshold: 0.90,
142149
MetricsStalenessThreshold: 100 * time.Millisecond,
143150
}
144151

145152
tests := []struct {
146153
name string
147-
config Config
154+
config *Config
148155
pods []*backendmetrics.FakePodMetrics
149156
expectedSaturat bool
150157
}{
@@ -307,18 +314,6 @@ func TestDetector_IsSaturated(t *testing.T) {
307314
},
308315
expectedSaturat: false,
309316
},
310-
{
311-
name: "Metrics age exactly at staleness threshold",
312-
config: defaultConfig,
313-
pods: []*backendmetrics.FakePodMetrics{
314-
newMockPodMetrics("pod1", &backendmetrics.MetricsState{
315-
UpdateTime: baseTime.Add(-defaultConfig.MetricsStalenessThreshold), // Exactly at threshold (good)
316-
WaitingQueueSize: 1,
317-
KVCacheUsagePercent: 0.1,
318-
}),
319-
},
320-
expectedSaturat: false,
321-
},
322317
{
323318
name: "Metrics age just over staleness threshold",
324319
config: defaultConfig,
@@ -333,18 +328,15 @@ func TestDetector_IsSaturated(t *testing.T) {
333328
},
334329
}
335330

336-
for _, tt := range tests {
337-
t.Run(tt.name, func(t *testing.T) {
338-
mockDS := &mockDatastore{pods: tt.pods}
339-
340-
detector, err := NewDetector(tt.config, mockDS, logr.Discard())
331+
for _, test := range tests {
332+
t.Run(test.name, func(t *testing.T) {
333+
detector, err := NewDetector(test.config, &mockDatastore{pods: test.pods}, logr.Discard())
341334
if err != nil {
342335
t.Fatalf("NewDetector() failed: %v", err)
343336
}
344-
detector.clock = newMockClock(baseTime)
345337

346-
if got := detector.IsSaturated(context.Background()); got != tt.expectedSaturat {
347-
t.Errorf("IsSaturated() = %v, want %v", got, tt.expectedSaturat)
338+
if got := detector.IsSaturated(context.Background()); got != test.expectedSaturat {
339+
t.Errorf("IsSaturated() = %v, want %v", got, test.expectedSaturat)
348340
}
349341
})
350342
}

0 commit comments

Comments
 (0)