Skip to content

update algorithm parameters from env variables #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/epp/scheduling/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func leastQueuingFilterFunc(logger logr.Logger, req *LLMRequest, pods []backendm
}

func lowQueueingPodPredicate(_ *LLMRequest, pod backendmetrics.PodMetrics) bool {
return pod.GetMetrics().WaitingQueueSize < queueingThresholdLoRA
return pod.GetMetrics().WaitingQueueSize < config.QueueingThresholdLoRA
}

// leastKVCacheFilterFunc finds the max and min KV cache of all pods, divides the whole range
Expand Down Expand Up @@ -223,7 +223,7 @@ func loRASoftAffinityFilter(logger logr.Logger, req *LLMRequest, pods []backendm

// If both groups have pods, use probability to select which group to return
if len(filtered_affinity) > 0 && len(filtered_available) > 0 {
if randGen.Float64() < loraAffinityThreshold {
if randGen.Float64() < config.LoraAffinityThreshold {
return filtered_affinity, nil
}
return filtered_available, nil
Expand Down
35 changes: 29 additions & 6 deletions pkg/epp/scheduling/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ func TestFilterFunc(t *testing.T) {
}
}

// UpdateLoraAffinityThreshold updates the LoRA affinity threshold value
// This is useful for testing or dynamic reconfiguration
func UpdateLoraAffinityThreshold(newValue float64, logger logr.Logger) {
logger.V(logutil.DEFAULT).Info("Updating LoRA affinity threshold",
"oldValue", config.LoraAffinityThreshold,
"newValue", newValue)
config.LoraAffinityThreshold = newValue
}

// TestLoRASoftAffinityDistribution tests that the loRASoftAffinityFilter function
// properly distributes requests according to the loraAffinityThreshold
func TestLoRASoftAffinityDistribution(t *testing.T) {
Expand All @@ -442,6 +451,18 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
tolerancePercent = 5.0 // Allow 5% tolerance from expected distribution
)

// Save original config value to restore later
originalThreshold := config.LoraAffinityThreshold

// Set a specific test value for this test
testThreshold := 0.75 // 75%
UpdateLoraAffinityThreshold(testThreshold, logger)

// Ensure we restore the original threshold when test completes
defer func() {
UpdateLoraAffinityThreshold(originalThreshold, logger)
}()

// Create a test request and pods
req := &LLMRequest{
Model: testAffinityModel,
Expand Down Expand Up @@ -472,9 +493,10 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
affinityCount := 0
availableCount := 0

// Use the actual loraAffinityThreshold as defined in the original code
// This test should work with whatever value is set there
expectedAffinityPercent := loraAffinityThreshold * 100
// Use the test threshold value
expectedAffinityPercent := config.LoraAffinityThreshold * 100
expectedAvailabilityPercent := 100 - expectedAffinityPercent

for i := 0; i < numIterations; i++ {
result, err := loRASoftAffinityFilter(logger, req, toInterface(pods))
if err != nil {
Expand Down Expand Up @@ -502,11 +524,12 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
affinityLowerBound := expectedAffinityPercent - tolerancePercent
affinityUpperBound := expectedAffinityPercent + tolerancePercent

availableLowerBound := actualAvailablePercent - tolerancePercent
availableUpperBound := actualAvailablePercent + tolerancePercent
availableLowerBound := expectedAvailabilityPercent - tolerancePercent
availableUpperBound := expectedAvailabilityPercent + tolerancePercent

t.Logf("Distribution results over %d iterations:", numIterations)
t.Logf("Expected affinity percent: %.2f%% (threshold: %.2f)", expectedAffinityPercent, loraAffinityThreshold)
t.Logf("Expected affinity percent: %.2f%% (threshold: %.2f)", expectedAffinityPercent, config.LoraAffinityThreshold)
t.Logf("Expected availability percent: %.2f%% (threshold: %.2f)", expectedAvailabilityPercent, config.LoraAffinityThreshold)
t.Logf("Actual affinity percent: %.2f%% (%d out of %d)", actualAffinityPercent, affinityCount, numIterations)
t.Logf("Actual available percent: %.2f%% (%d out of %d)", actualAvailablePercent, availableCount, numIterations)

Expand Down
68 changes: 52 additions & 16 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,48 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
)

const (
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
kvCacheThreshold = 0.8
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
queueThresholdCritical = 5
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
// the threshold for queued requests to be considered low below which we can prioritize LoRA affinity.
// The value of 128 is arrived heuristicically based on experiments.
queueingThresholdLoRA = 128
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
// loraAffinityThreshold indicates the probability with which we prefer a pod with LoRA affinity over a pod without but having room to fit more LoRA adapters.
loraAffinityThreshold = 0.999
// Config holds all the configuration values for the scheduler
type Config struct {
KVCacheThreshold float64
QueueThresholdCritical int
QueueingThresholdLoRA int
LoraAffinityThreshold float64
}

var (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these can be const

// Default values to use if environment variables are not set
defaultKVCacheThreshold = 0.8
defaultQueueThresholdCritical = 5
defaultQueueingThresholdLoRA = 128
defaultLoraAffinityThreshold = 0.999
)

// LoadConfig loads configuration from environment variables
func LoadConfig() Config {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically env variables are processed in the main func. It would be considered surprising to process an env var in a package like this and load it into a global var.

The unification of these config variables allows a reader to be able to reliably find the config, rather than having to hunt for its consumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree with this, will tackle it in a separate issue as suggested above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a global struct to hold all env vars for discoverability. I am OK with a follow up.

// Use a default logger for initial configuration loading
baseLogger := log.Log.WithName("scheduling-config")

config := Config{
KVCacheThreshold: envutil.GetEnvFloat("KV_CACHE_THRESHOLD", defaultKVCacheThreshold, baseLogger),
QueueThresholdCritical: envutil.GetEnvInt("QUEUE_THRESHOLD_CRITICAL", defaultQueueThresholdCritical, baseLogger),
QueueingThresholdLoRA: envutil.GetEnvInt("QUEUING_THRESHOLD_LORA", defaultQueueingThresholdLoRA, baseLogger),
LoraAffinityThreshold: envutil.GetEnvFloat("LORA_AFFINITY_THRESHOLD", defaultLoraAffinityThreshold, baseLogger),
}

baseLogger.V(logutil.DEFAULT).Info("Scheduler configuration loaded",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can simply log the config object itself, logger.Info("Scheduler configuration loaded", "config", config)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"kvCacheThreshold", config.KVCacheThreshold,
"queueThresholdCritical", config.QueueThresholdCritical,
"queueingThresholdLoRA", config.QueueingThresholdLoRA,
"loraAffinityThreshold", config.LoraAffinityThreshold)

return config
}

var config = LoadConfig()

var (
defaultFilter = &filter{
name: "critical request",
Expand Down Expand Up @@ -92,7 +118,7 @@ var (
// cache below a certain threshold, we consider this model server has capacity to handle
// a sheddable request without impacting critical requests.
name: "has capacity for sheddable requests",
filter: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(queueThresholdCritical, kvCacheThreshold)),
filter: toFilterFunc(noQueueAndLessThanKVCacheThresholdPredicate(config.QueueThresholdCritical, config.KVCacheThreshold)),
nextOnSuccess: queueLoRAAndKVCacheFilter,
// If all pods are queuing or running above the KVCache threshold, we drop the sheddable
// request to make room for critical requests.
Expand All @@ -108,6 +134,8 @@ var (
}
)



func NewScheduler(datastore datastore.Datastore) *Scheduler {
return &Scheduler{
datastore: datastore,
Expand All @@ -123,13 +151,21 @@ type Scheduler struct {
// Schedule finds the target pod based on metrics and the requested lora adapter.
func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod backendmetrics.PodMetrics, err error) {
logger := log.FromContext(ctx).WithValues("request", req)
podMetrics := s.datastore.PodGetAll()

// Log current configuration values for debugging purposes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the config is only loaded once, why do we log it in every request? it's already logged in LoadConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its logutil.TRACE, so should be logged in unless tracing is on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, I would expect config parameters to be logged once on startup, you can move this log line to NewScheduler for now, and later we should move it to main

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for this since this is already logged on startup. Also logging such environment configurable should be DEFAULT level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

logger.V(logutil.TRACE).Info("Scheduler configuration",
"KVCacheThreshold", config.KVCacheThreshold,
"QueueThresholdCritical", config.QueueThresholdCritical,
"QueueingThresholdLoRA", config.QueueingThresholdLoRA,
"LoraAffinityThreshold", config.LoraAffinityThreshold,
)

podMetrics := s.datastore.PodGetAll()
logger.V(logutil.DEBUG).Info(fmt.Sprintf("Scheduling a request. Metrics: %+v", podMetrics))

pods, err := s.filter.Filter(logger, req, podMetrics)
if err != nil || len(pods) == 0 {
return nil, fmt.Errorf(
"failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
}
logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods))
i := rand.Intn(len(pods))
Expand Down
51 changes: 51 additions & 0 deletions pkg/epp/util/env/env.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls add unit tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package env

import (
"os"
"strconv"

"github.com/go-logr/logr"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// getEnvFloat gets a float64 from an environment variable with a default value
func GetEnvFloat(key string, defaultVal float64, logger logr.Logger) float64 {
val, exists := os.LookupEnv(key)
if !exists {
logger.V(logutil.VERBOSE).Info("Environment variable not set, using default value",
"key", key, "defaultValue", defaultVal)
return defaultVal
}

floatVal, err := strconv.ParseFloat(val, 64)
if err != nil {
logger.V(logutil.VERBOSE).Info("Failed to parse environment variable as float, using default value",
"key", key, "value", val, "error", err, "defaultValue", defaultVal)
return defaultVal
}

logger.V(logutil.VERBOSE).Info("Successfully loaded environment variable",
"key", key, "value", floatVal)
return floatVal
}

// getEnvInt gets an int from an environment variable with a default value
func GetEnvInt(key string, defaultVal int, logger logr.Logger) int {
val, exists := os.LookupEnv(key)
if !exists {
logger.V(logutil.VERBOSE).Info("Environment variable not set, using default value",
"key", key, "defaultValue", defaultVal)
return defaultVal
}

intVal, err := strconv.Atoi(val)
if err != nil {
logger.V(logutil.VERBOSE).Info("Failed to parse environment variable as int, using default value",
"key", key, "value", val, "error", err, "defaultValue", defaultVal)
return defaultVal
}

logger.V(logutil.VERBOSE).Info("Successfully loaded environment variable",
"key", key, "value", intVal)
return intVal
}